This is an automated email from the ASF dual-hosted git repository.

mridulm80 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new f9e468eafdf0 [SPARK-46399][CORE] Add exit status to the Application 
End event for the use of Spark Listener
f9e468eafdf0 is described below

commit f9e468eafdf0682a2353897677d8487e1c2ab613
Author: Reza Safi <[email protected]>
AuthorDate: Wed Dec 20 02:20:56 2023 -0600

    [SPARK-46399][CORE] Add exit status to the Application End event for the 
use of Spark Listener
    
    ### What changes were proposed in this pull request?
    Currently SparkListenerApplicationEnd only has a timestamp value and there 
is not exit status recorded with it.
    This change will exitcode to the SparkListenerApplicationEnd event.
    
    ### Why are the changes needed?
    Without this it is hard to understand whether an attempt has failed or 
succeeded when using spark listeners.
    
    ### Does this PR introduce _any_ user-facing change?
    No, the added exitCode is an optional parameter of the event.
    
    ### How was this patch tested?
    Locally using the normal tests. also anew tests is added.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No
    
    Closes #44340 from rezasafi/rezaaddstatuslistener.
    
    Authored-by: Reza Safi <[email protected]>
    Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
---
 core/src/main/scala/org/apache/spark/SparkContext.scala     |  6 +++---
 .../scala/org/apache/spark/scheduler/SparkListener.scala    |  4 +++-
 .../src/main/scala/org/apache/spark/util/JsonProtocol.scala |  4 +++-
 .../apache/spark/scheduler/EventLoggingListenerSuite.scala  | 13 +++++++++++--
 project/MimaExcludes.scala                                  |  3 +++
 5 files changed, 23 insertions(+), 7 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala 
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index b43bb08d25a7..da37fa83254b 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -2273,7 +2273,7 @@ class SparkContext(config: SparkConf) extends Logging {
 
     if (listenerBus != null) {
       Utils.tryLogNonFatalError {
-        postApplicationEnd()
+        postApplicationEnd(exitCode)
       }
     }
     Utils.tryLogNonFatalError {
@@ -2803,8 +2803,8 @@ class SparkContext(config: SparkConf) extends Logging {
   }
 
   /** Post the application end event */
-  private def postApplicationEnd(): Unit = {
-    listenerBus.post(SparkListenerApplicationEnd(System.currentTimeMillis))
+  private def postApplicationEnd(exitCode: Int): Unit = {
+    listenerBus.post(SparkListenerApplicationEnd(System.currentTimeMillis, 
Some(exitCode)))
   }
 
   /** Post the environment update event once the task scheduler is ready */
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala 
b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
index fd846545d689..cc19b71bfc4d 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
@@ -289,7 +289,9 @@ case class SparkListenerApplicationStart(
     driverAttributes: Option[Map[String, String]] = None) extends 
SparkListenerEvent
 
 @DeveloperApi
-case class SparkListenerApplicationEnd(time: Long) extends SparkListenerEvent
+case class SparkListenerApplicationEnd(
+    time: Long,
+    exitCode: Option[Int] = None) extends SparkListenerEvent
 
 /**
  * An internal class that describes the metadata of an event log.
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala 
b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index 987e5b4328f9..22dcf6c11e4b 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -277,6 +277,7 @@ private[spark] object JsonProtocol extends JsonUtils {
     g.writeStartObject()
     g.writeStringField("Event", 
SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.applicationEnd)
     g.writeNumberField("Timestamp", applicationEnd.time)
+    applicationEnd.exitCode.foreach(exitCode => g.writeNumberField("ExitCode", 
exitCode))
     g.writeEndObject()
   }
 
@@ -1065,7 +1066,8 @@ private[spark] object JsonProtocol extends JsonUtils {
   }
 
   def applicationEndFromJson(json: JsonNode): SparkListenerApplicationEnd = {
-    SparkListenerApplicationEnd(json.get("Timestamp").extractLong)
+    val exitCode = jsonOption(json.get("ExitCode")).map(_.extractInt)
+    SparkListenerApplicationEnd(json.get("Timestamp").extractLong, exitCode)
   }
 
   def executorAddedFromJson(json: JsonNode): SparkListenerExecutorAdded = {
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
 
b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
index cbb91ff9dca9..74511d642729 100644
--- 
a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
@@ -75,6 +75,14 @@ class EventLoggingListenerSuite extends SparkFunSuite with 
LocalSparkContext wit
     testApplicationEventLogging()
   }
 
+  test("End-to-end event logging with exit code") {
+    testEventLogging(exitCode = Some(0))
+  }
+
+  test("End-to-end event logging with exit code being None") {
+    testEventLogging(exitCode = None)
+  }
+
   test("End-to-end event logging with compression") {
     CompressionCodec.ALL_COMPRESSION_CODECS.foreach { codec =>
       testApplicationEventLogging(compressionCodec = 
Some(CompressionCodec.getShortName(codec)))
@@ -218,7 +226,8 @@ class EventLoggingListenerSuite extends SparkFunSuite with 
LocalSparkContext wit
    */
   private def testEventLogging(
       compressionCodec: Option[String] = None,
-      extraConf: Map[String, String] = Map()): Unit = {
+      extraConf: Map[String, String] = Map(),
+      exitCode: Option[Int] = None): Unit = {
     val conf = getLoggingConf(testDirPath, compressionCodec)
     extraConf.foreach { case (k, v) => conf.set(k, v) }
     val logName = compressionCodec.map("test-" + _).getOrElse("test")
@@ -226,7 +235,7 @@ class EventLoggingListenerSuite extends SparkFunSuite with 
LocalSparkContext wit
     val listenerBus = new LiveListenerBus(conf)
     val applicationStart = SparkListenerApplicationStart("Greatest App 
(N)ever", None,
       125L, "Mickey", None)
-    val applicationEnd = SparkListenerApplicationEnd(1000L)
+    val applicationEnd = SparkListenerApplicationEnd(1000L, exitCode)
 
     // A comprehensive test on JSON de/serialization of all events is in 
JsonProtocolSuite
     eventLogger.start()
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index cfda74509720..2779340e861b 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -42,6 +42,9 @@ object MimaExcludes {
     
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.api.python.BasePythonRunner#ReaderIterator.this"),
     // [SPARK-44198][CORE] Support propagation of the log level to the 
executors
     
ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages$SparkAppConfig$"),
+    //[SPARK-46399][Core] Add exit status to the Application End event for the 
use of Spark Listener
+    
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.scheduler.SparkListenerApplicationEnd.*"),
+    
ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.scheduler.SparkListenerApplicationEnd$"),
     // [SPARK-45427][CORE] Add RPC SSL settings to SSLOptions and 
SparkTransportConf
     
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.network.netty.SparkTransportConf.fromSparkConf"),
     // [SPARK-45136][CONNECT] Enhance ClosureCleaner with Ammonite support


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to