This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new b722b1c0d6f0 [SPARK-54170][CORE] Use StructuredLogging message in
Scala side
b722b1c0d6f0 is described below
commit b722b1c0d6f06df519281ee7e37292ea221b2f4d
Author: Takuya Ueshin <[email protected]>
AuthorDate: Tue Nov 4 10:02:36 2025 -0800
[SPARK-54170][CORE] Use StructuredLogging message in Scala side
### What changes were proposed in this pull request?
Uses StructuredLogging message in Scala side.
### Why are the changes needed?
Follow-up of https://github.com/apache/spark/pull/52689.
The StructuredLogging framework should be used.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Manually.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #52864 from ueshin/issues/SPARK-54170/mdc.
Authored-by: Takuya Ueshin <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../src/main/java/org/apache/spark/internal/LogKeys.java | 1 +
.../apache/spark/api/python/PythonWorkerLogCapture.scala | 15 ++++++++++-----
2 files changed, 11 insertions(+), 5 deletions(-)
diff --git
a/common/utils-java/src/main/java/org/apache/spark/internal/LogKeys.java
b/common/utils-java/src/main/java/org/apache/spark/internal/LogKeys.java
index 8b6d3614b86d..48bc6f201bc7 100644
--- a/common/utils-java/src/main/java/org/apache/spark/internal/LogKeys.java
+++ b/common/utils-java/src/main/java/org/apache/spark/internal/LogKeys.java
@@ -614,6 +614,7 @@ public enum LogKeys implements LogKey {
PYTHON_WORKER_CHANNEL_IS_BLOCKING_MODE,
PYTHON_WORKER_CHANNEL_IS_CONNECTED,
PYTHON_WORKER_HAS_INPUTS,
+ PYTHON_WORKER_ID,
PYTHON_WORKER_IDLE_TIMEOUT,
PYTHON_WORKER_IS_ALIVE,
PYTHON_WORKER_MODULE,
diff --git
a/core/src/main/scala/org/apache/spark/api/python/PythonWorkerLogCapture.scala
b/core/src/main/scala/org/apache/spark/api/python/PythonWorkerLogCapture.scala
index 71fc00546ef6..a2a7c5ea1451 100644
---
a/core/src/main/scala/org/apache/spark/api/python/PythonWorkerLogCapture.scala
+++
b/core/src/main/scala/org/apache/spark/api/python/PythonWorkerLogCapture.scala
@@ -26,7 +26,7 @@ import java.util.concurrent.atomic.AtomicLong
import scala.jdk.CollectionConverters._
import org.apache.spark.SparkEnv
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, LogKeys}
import org.apache.spark.storage.{PythonWorkerLogBlockIdGenerator,
PythonWorkerLogLine, RollingLogWriter}
/**
@@ -64,7 +64,9 @@ private[python] class PythonWorkerLogCapture(
writer.close()
} catch {
case e: Exception =>
- logWarning(s"Failed to close log writer for worker $workerId", e)
+ logWarning(
+ log"Failed to close log writer for worker
${MDC(LogKeys.PYTHON_WORKER_ID, workerId)}",
+ e)
}
}
}
@@ -73,12 +75,14 @@ private[python] class PythonWorkerLogCapture(
* Closes all active worker log writers.
*/
def closeAllWriters(): Unit = {
- workerLogWriters.values().asScala.foreach { case (writer, _) =>
+ workerLogWriters.asScala.foreach { case (workerId, (writer, _)) =>
try {
writer.close()
} catch {
case e: Exception =>
- logWarning("Failed to close log writer", e)
+ logWarning(
+ log"Failed to close log writer for worker
${MDC(LogKeys.PYTHON_WORKER_ID, workerId)}",
+ e)
}
}
workerLogWriters.clear()
@@ -128,7 +132,8 @@ private[python] class PythonWorkerLogCapture(
}
} catch {
case e: Exception =>
- logWarning(s"Failed to write log for worker $workerId", e)
+ logWarning(
+ log"Failed to write log for worker
${MDC(LogKeys.PYTHON_WORKER_ID, workerId)}", e)
}
}
prefix
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]