This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 75805f07f5ca [SPARK-46339][SS] Directory with batch number name should
not be treated as metadata log
75805f07f5ca is described below
commit 75805f07f5caeb01104a7352b02790d03a043ded
Author: Liang-Chi Hsieh <[email protected]>
AuthorDate: Sat Dec 9 15:20:55 2023 -0800
[SPARK-46339][SS] Directory with batch number name should not be treated as
metadata log
### What changes were proposed in this pull request?
This patch updates the document of `CheckpointFileManager.list` method to
reflect the fact it is used to return both files and directories to reduce
confusion.
For the usage like `HDFSMetadataLog` where it assumes returned file status
by `list` are all files, we add a filter there to avoid confusing error.
### Why are the changes needed?
`HDFSMetadataLog` takes a metadata path as parameter. When it goes to
retrieves all batches metadata, it calls `CheckpointFileManager.list` to get
all files under the metadata path. However, currently all implementations of
`CheckpointFileManager.list` returns all files/directories under the given
path. So if there is a dictionary with name of batch number (a long value), the
directory will be returned too and cause trouble when `HDFSMetadataLog` goes to
read it.
Actually, `CheckpointFileManager.list` method clearly defines that it lists
the "files" in a path. That's being said, current implementations don't follow
the doc. We tried to make `list` method implementations only return files but
some usage (state metadata) of `list` method already break the assumption and
they use dictionaries returned by `list` method. So we simply update `list`
method document to explicitly define it returns both files/dictionaries. We add
a filter in `HDFSMetad [...]
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Added test
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #44272 from viirya/fix_metadatalog.
Authored-by: Liang-Chi Hsieh <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../sql/execution/streaming/CheckpointFileManager.scala | 4 ++--
.../spark/sql/execution/streaming/HDFSMetadataLog.scala | 2 ++
.../spark/sql/execution/streaming/HDFSMetadataLogSuite.scala | 12 ++++++++++++
3 files changed, 16 insertions(+), 2 deletions(-)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala
index af2c97b21138..34c5dee0997b 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala
@@ -65,10 +65,10 @@ trait CheckpointFileManager {
/** Open a file for reading, or throw exception if it does not exist. */
def open(path: Path): FSDataInputStream
- /** List the files in a path that match a filter. */
+ /** List the files/directories in a path that match a filter. */
def list(path: Path, filter: PathFilter): Array[FileStatus]
- /** List all the files in a path. */
+ /** List all the files/directories in a path. */
def list(path: Path): Array[FileStatus] = {
list(path, (_: Path) => true)
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
index 79627030e1eb..b3eedbf93f04 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
@@ -327,6 +327,8 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession:
SparkSession, path:
/** List the available batches on file system. */
protected def listBatches: Array[Long] = {
val batchIds = fileManager.list(metadataPath, batchFilesFilter)
+ // Batches must be files
+ .filter(f => f.isFile)
.map(f => pathToBatchId(f.getPath)) ++
// Iterate over keySet is not thread safe. We call `toArray` to make a
copy in the lock to
// elimiate the race condition.
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala
index 980d532dd477..08f245135f58 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala
@@ -33,6 +33,18 @@ class HDFSMetadataLogSuite extends SharedSparkSession {
private implicit def toOption[A](a: A): Option[A] = Option(a)
+ test("SPARK-46339: Directory with number name should not be treated as
metadata log") {
+ withTempDir { temp =>
+ val dir = new File(temp, "dir")
+ val metadataLog = new HDFSMetadataLog[String](spark, dir.getAbsolutePath)
+ assert(metadataLog.metadataPath.toString.endsWith("/dir"))
+
+ // Create a directory with batch id 0
+ new File(dir, "0").mkdir()
+ assert(metadataLog.getLatest() === None)
+ }
+ }
+
test("HDFSMetadataLog: basic") {
withTempDir { temp =>
val dir = new File(temp, "dir") // use non-existent directory to test
whether log make the dir
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]