This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new b0a7e2e01ccc [SPARK-51685][SS] Excessive Info logging from RocksDb 
operations causing too big executor stderr files
b0a7e2e01ccc is described below

commit b0a7e2e01ccc294d1531b95b17b35e5ead5a66aa
Author: vinodkc <vinod.kc...@gmail.com>
AuthorDate: Fri Apr 4 10:25:14 2025 +0900

    [SPARK-51685][SS] Excessive Info logging from RocksDb operations causing 
too big executor stderr files
    
    ### What changes were proposed in this pull request?
    
    Long-running structured streaming applications with RocksDb statestore is 
failing after some time due to "No space left on device" error.
    Checked the executor logs and noticed that the volume of executor logs is 
crazy. Around 1GB of data for 1 minute of run.
    Each info log entry from `RocksDBFileManager` and `RocksDB` classes were 
printing more than 4000 lines
    
    ### Why are the changes needed?
    
     Info logs printing all checkpoint file paths. These verbos info logs 
affects streaming application disk space as most users keep Info as the default 
log level.
    So it is better to use Debug log to log such details.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No, some verbos info level log entries changed to debug level
    
    ### How was this patch tested?
    
    Manaully verified the executor logs using info and debug log levels
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #50483 from vinodkc/br_fix_excessive_log.
    
    Authored-by: vinodkc <vinod.kc...@gmail.com>
    Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com>
---
 core/src/main/scala/org/apache/spark/util/Utils.scala             | 2 +-
 .../org/apache/spark/sql/execution/streaming/state/RocksDB.scala  | 5 +++--
 .../spark/sql/execution/streaming/state/RocksDBFileManager.scala  | 8 ++++----
 3 files changed, 8 insertions(+), 7 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala 
b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 1dcc91af862e..ea9b742fb2e1 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -3085,7 +3085,7 @@ private[spark] object Utils
         entry = in.getNextEntry()
       }
       in.close() // so that any error in closing does not get ignored
-      logInfo(log"Unzipped from ${MDC(PATH, dfsZipFile)}\n\t${MDC(PATHS, 
files.mkString("\n\t"))}")
+      logDebug(log"Unzipped from ${MDC(PATH, dfsZipFile)}\n\t${MDC(PATHS, 
files.mkString("\n\t"))}")
     } finally {
       // Close everything no matter what happened
       IOUtils.closeQuietly(in)
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
index 85bb77b7afb3..b324dc73f529 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
@@ -1487,11 +1487,12 @@ class RocksDB(
       override def log(infoLogLevel: InfoLogLevel, logMsg: String) = {
         // Map DB log level to log4j levels
         // Warn is mapped to info because RocksDB warn is too verbose
+        // Info is mapped to debug because RocksDB info is too verbose
         // (e.g. dumps non-warning stuff like stats)
         val loggingFunc: ( => LogEntry) => Unit = infoLogLevel match {
           case InfoLogLevel.FATAL_LEVEL | InfoLogLevel.ERROR_LEVEL => 
logError(_)
-          case InfoLogLevel.WARN_LEVEL | InfoLogLevel.INFO_LEVEL => logInfo(_)
-          case InfoLogLevel.DEBUG_LEVEL => logDebug(_)
+          case InfoLogLevel.WARN_LEVEL => logInfo(_)
+          case InfoLogLevel.INFO_LEVEL | InfoLogLevel.DEBUG_LEVEL => 
logDebug(_)
           case _ => logTrace(_)
         }
         loggingFunc(log"[NativeRocksDB-${MDC(LogKeys.ROCKS_DB_LOG_LEVEL, 
infoLogLevel.getValue)}]" +
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
index 852ddb7b9916..a73835973feb 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
@@ -291,7 +291,7 @@ class RocksDBFileManager(
       colFamilyIdMapping, colFamilyTypeMapping, maxColumnFamilyId)
     val metadataFile = localMetadataFile(checkpointDir)
     metadata.writeToFile(metadataFile)
-    logInfo(log"Written metadata for version ${MDC(LogKeys.VERSION_NUM, 
version)}:\n" +
+    logDebug(log"Written metadata for version ${MDC(LogKeys.VERSION_NUM, 
version)}:\n" +
       log"${MDC(LogKeys.METADATA_JSON, metadata.prettyJson)}")
 
     if (version <= 1 && numKeys <= 0) {
@@ -341,7 +341,7 @@ class RocksDBFileManager(
       // Copy the necessary immutable files
       val metadataFile = localMetadataFile(localDir)
       val metadata = RocksDBCheckpointMetadata.readFromFile(metadataFile)
-      logInfo(log"Read metadata for version ${MDC(LogKeys.VERSION_NUM, 
version)}:\n" +
+      logDebug(log"Read metadata for version ${MDC(LogKeys.VERSION_NUM, 
version)}:\n" +
         log"${MDC(LogKeys.METADATA_JSON, metadata.prettyJson)}")
       loadImmutableFilesFromDfs(metadata.immutableFiles, localDir, 
rocksDBFileMapping, version)
       versionToRocksDBFiles.put((version, checkpointUniqueId), 
metadata.immutableFiles)
@@ -845,7 +845,7 @@ class RocksDBFileManager(
         totalBytes += bytes
       }
       zout.close()  // so that any error in closing also cancels the output 
stream
-      logInfo(log"Zipped ${MDC(LogKeys.NUM_BYTES, totalBytes)} bytes (before 
compression) to " +
+      logDebug(log"Zipped ${MDC(LogKeys.NUM_BYTES, totalBytes)} bytes (before 
compression) to " +
         log"${MDC(LogKeys.FILE_NAME, filesStr)}")
       // The other fields saveCheckpointMetrics should have been filled
       saveCheckpointMetrics =
@@ -868,7 +868,7 @@ class RocksDBFileManager(
     lazy val files = 
Option(Utils.recursiveList(dir)).getOrElse(Array.empty).map { f =>
       s"${f.getAbsolutePath} - ${f.length()} bytes"
     }
-    logInfo(msg + log" - ${MDC(LogKeys.NUM_FILES, files.length)} files\n\t" +
+    logDebug(msg + log" - ${MDC(LogKeys.NUM_FILES, files.length)} files\n\t" +
       log"${MDC(LogKeys.FILE_NAME, files.mkString("\n\t"))}")
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to