This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new b0a7e2e01ccc [SPARK-51685][SS] Excessive Info logging from RocksDb operations causing too big executor stderr files b0a7e2e01ccc is described below commit b0a7e2e01ccc294d1531b95b17b35e5ead5a66aa Author: vinodkc <vinod.kc...@gmail.com> AuthorDate: Fri Apr 4 10:25:14 2025 +0900 [SPARK-51685][SS] Excessive Info logging from RocksDb operations causing too big executor stderr files ### What changes were proposed in this pull request? Long-running structured streaming applications with RocksDb statestore is failing after some time due to "No space left on device" error. Checked the executor logs and noticed that the volume of executor logs is crazy. Around 1GB of data for 1 minute of run. Each info log entry from `RocksDBFileManager` and `RocksDB` classes were printing more than 4000 lines ### Why are the changes needed? Info logs printing all checkpoint file paths. These verbos info logs affects streaming application disk space as most users keep Info as the default log level. So it is better to use Debug log to log such details. ### Does this PR introduce _any_ user-facing change? No, some verbos info level log entries changed to debug level ### How was this patch tested? Manaully verified the executor logs using info and debug log levels ### Was this patch authored or co-authored using generative AI tooling? No Closes #50483 from vinodkc/br_fix_excessive_log. Authored-by: vinodkc <vinod.kc...@gmail.com> Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com> --- core/src/main/scala/org/apache/spark/util/Utils.scala | 2 +- .../org/apache/spark/sql/execution/streaming/state/RocksDB.scala | 5 +++-- .../spark/sql/execution/streaming/state/RocksDBFileManager.scala | 8 ++++---- 3 files changed, 8 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 1dcc91af862e..ea9b742fb2e1 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -3085,7 +3085,7 @@ private[spark] object Utils entry = in.getNextEntry() } in.close() // so that any error in closing does not get ignored - logInfo(log"Unzipped from ${MDC(PATH, dfsZipFile)}\n\t${MDC(PATHS, files.mkString("\n\t"))}") + logDebug(log"Unzipped from ${MDC(PATH, dfsZipFile)}\n\t${MDC(PATHS, files.mkString("\n\t"))}") } finally { // Close everything no matter what happened IOUtils.closeQuietly(in) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala index 85bb77b7afb3..b324dc73f529 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala @@ -1487,11 +1487,12 @@ class RocksDB( override def log(infoLogLevel: InfoLogLevel, logMsg: String) = { // Map DB log level to log4j levels // Warn is mapped to info because RocksDB warn is too verbose + // Info is mapped to debug because RocksDB info is too verbose // (e.g. dumps non-warning stuff like stats) val loggingFunc: ( => LogEntry) => Unit = infoLogLevel match { case InfoLogLevel.FATAL_LEVEL | InfoLogLevel.ERROR_LEVEL => logError(_) - case InfoLogLevel.WARN_LEVEL | InfoLogLevel.INFO_LEVEL => logInfo(_) - case InfoLogLevel.DEBUG_LEVEL => logDebug(_) + case InfoLogLevel.WARN_LEVEL => logInfo(_) + case InfoLogLevel.INFO_LEVEL | InfoLogLevel.DEBUG_LEVEL => logDebug(_) case _ => logTrace(_) } loggingFunc(log"[NativeRocksDB-${MDC(LogKeys.ROCKS_DB_LOG_LEVEL, infoLogLevel.getValue)}]" + diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala index 852ddb7b9916..a73835973feb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala @@ -291,7 +291,7 @@ class RocksDBFileManager( colFamilyIdMapping, colFamilyTypeMapping, maxColumnFamilyId) val metadataFile = localMetadataFile(checkpointDir) metadata.writeToFile(metadataFile) - logInfo(log"Written metadata for version ${MDC(LogKeys.VERSION_NUM, version)}:\n" + + logDebug(log"Written metadata for version ${MDC(LogKeys.VERSION_NUM, version)}:\n" + log"${MDC(LogKeys.METADATA_JSON, metadata.prettyJson)}") if (version <= 1 && numKeys <= 0) { @@ -341,7 +341,7 @@ class RocksDBFileManager( // Copy the necessary immutable files val metadataFile = localMetadataFile(localDir) val metadata = RocksDBCheckpointMetadata.readFromFile(metadataFile) - logInfo(log"Read metadata for version ${MDC(LogKeys.VERSION_NUM, version)}:\n" + + logDebug(log"Read metadata for version ${MDC(LogKeys.VERSION_NUM, version)}:\n" + log"${MDC(LogKeys.METADATA_JSON, metadata.prettyJson)}") loadImmutableFilesFromDfs(metadata.immutableFiles, localDir, rocksDBFileMapping, version) versionToRocksDBFiles.put((version, checkpointUniqueId), metadata.immutableFiles) @@ -845,7 +845,7 @@ class RocksDBFileManager( totalBytes += bytes } zout.close() // so that any error in closing also cancels the output stream - logInfo(log"Zipped ${MDC(LogKeys.NUM_BYTES, totalBytes)} bytes (before compression) to " + + logDebug(log"Zipped ${MDC(LogKeys.NUM_BYTES, totalBytes)} bytes (before compression) to " + log"${MDC(LogKeys.FILE_NAME, filesStr)}") // The other fields saveCheckpointMetrics should have been filled saveCheckpointMetrics = @@ -868,7 +868,7 @@ class RocksDBFileManager( lazy val files = Option(Utils.recursiveList(dir)).getOrElse(Array.empty).map { f => s"${f.getAbsolutePath} - ${f.length()} bytes" } - logInfo(msg + log" - ${MDC(LogKeys.NUM_FILES, files.length)} files\n\t" + + logDebug(msg + log" - ${MDC(LogKeys.NUM_FILES, files.length)} files\n\t" + log"${MDC(LogKeys.FILE_NAME, files.mkString("\n\t"))}") } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org