This is an automated email from the ASF dual-hosted git repository.
mridulm80 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new f9e468eafdf0 [SPARK-46399][CORE] Add exit status to the Application
End event for the use of Spark Listener
f9e468eafdf0 is described below
commit f9e468eafdf0682a2353897677d8487e1c2ab613
Author: Reza Safi <[email protected]>
AuthorDate: Wed Dec 20 02:20:56 2023 -0600
[SPARK-46399][CORE] Add exit status to the Application End event for the
use of Spark Listener
### What changes were proposed in this pull request?
Currently SparkListenerApplicationEnd only has a timestamp value and there
is not exit status recorded with it.
This change will exitcode to the SparkListenerApplicationEnd event.
### Why are the changes needed?
Without this it is hard to understand whether an attempt has failed or
succeeded when using spark listeners.
### Does this PR introduce _any_ user-facing change?
No, the added exitCode is an optional parameter of the event.
### How was this patch tested?
Locally using the normal tests. also anew tests is added.
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #44340 from rezasafi/rezaaddstatuslistener.
Authored-by: Reza Safi <[email protected]>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
---
core/src/main/scala/org/apache/spark/SparkContext.scala | 6 +++---
.../scala/org/apache/spark/scheduler/SparkListener.scala | 4 +++-
.../src/main/scala/org/apache/spark/util/JsonProtocol.scala | 4 +++-
.../apache/spark/scheduler/EventLoggingListenerSuite.scala | 13 +++++++++++--
project/MimaExcludes.scala | 3 +++
5 files changed, 23 insertions(+), 7 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index b43bb08d25a7..da37fa83254b 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -2273,7 +2273,7 @@ class SparkContext(config: SparkConf) extends Logging {
if (listenerBus != null) {
Utils.tryLogNonFatalError {
- postApplicationEnd()
+ postApplicationEnd(exitCode)
}
}
Utils.tryLogNonFatalError {
@@ -2803,8 +2803,8 @@ class SparkContext(config: SparkConf) extends Logging {
}
/** Post the application end event */
- private def postApplicationEnd(): Unit = {
- listenerBus.post(SparkListenerApplicationEnd(System.currentTimeMillis))
+ private def postApplicationEnd(exitCode: Int): Unit = {
+ listenerBus.post(SparkListenerApplicationEnd(System.currentTimeMillis,
Some(exitCode)))
}
/** Post the environment update event once the task scheduler is ready */
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
index fd846545d689..cc19b71bfc4d 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
@@ -289,7 +289,9 @@ case class SparkListenerApplicationStart(
driverAttributes: Option[Map[String, String]] = None) extends
SparkListenerEvent
@DeveloperApi
-case class SparkListenerApplicationEnd(time: Long) extends SparkListenerEvent
+case class SparkListenerApplicationEnd(
+ time: Long,
+ exitCode: Option[Int] = None) extends SparkListenerEvent
/**
* An internal class that describes the metadata of an event log.
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index 987e5b4328f9..22dcf6c11e4b 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -277,6 +277,7 @@ private[spark] object JsonProtocol extends JsonUtils {
g.writeStartObject()
g.writeStringField("Event",
SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.applicationEnd)
g.writeNumberField("Timestamp", applicationEnd.time)
+ applicationEnd.exitCode.foreach(exitCode => g.writeNumberField("ExitCode",
exitCode))
g.writeEndObject()
}
@@ -1065,7 +1066,8 @@ private[spark] object JsonProtocol extends JsonUtils {
}
def applicationEndFromJson(json: JsonNode): SparkListenerApplicationEnd = {
- SparkListenerApplicationEnd(json.get("Timestamp").extractLong)
+ val exitCode = jsonOption(json.get("ExitCode")).map(_.extractInt)
+ SparkListenerApplicationEnd(json.get("Timestamp").extractLong, exitCode)
}
def executorAddedFromJson(json: JsonNode): SparkListenerExecutorAdded = {
diff --git
a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
index cbb91ff9dca9..74511d642729 100644
---
a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
+++
b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
@@ -75,6 +75,14 @@ class EventLoggingListenerSuite extends SparkFunSuite with
LocalSparkContext wit
testApplicationEventLogging()
}
+ test("End-to-end event logging with exit code") {
+ testEventLogging(exitCode = Some(0))
+ }
+
+ test("End-to-end event logging with exit code being None") {
+ testEventLogging(exitCode = None)
+ }
+
test("End-to-end event logging with compression") {
CompressionCodec.ALL_COMPRESSION_CODECS.foreach { codec =>
testApplicationEventLogging(compressionCodec =
Some(CompressionCodec.getShortName(codec)))
@@ -218,7 +226,8 @@ class EventLoggingListenerSuite extends SparkFunSuite with
LocalSparkContext wit
*/
private def testEventLogging(
compressionCodec: Option[String] = None,
- extraConf: Map[String, String] = Map()): Unit = {
+ extraConf: Map[String, String] = Map(),
+ exitCode: Option[Int] = None): Unit = {
val conf = getLoggingConf(testDirPath, compressionCodec)
extraConf.foreach { case (k, v) => conf.set(k, v) }
val logName = compressionCodec.map("test-" + _).getOrElse("test")
@@ -226,7 +235,7 @@ class EventLoggingListenerSuite extends SparkFunSuite with
LocalSparkContext wit
val listenerBus = new LiveListenerBus(conf)
val applicationStart = SparkListenerApplicationStart("Greatest App
(N)ever", None,
125L, "Mickey", None)
- val applicationEnd = SparkListenerApplicationEnd(1000L)
+ val applicationEnd = SparkListenerApplicationEnd(1000L, exitCode)
// A comprehensive test on JSON de/serialization of all events is in
JsonProtocolSuite
eventLogger.start()
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index cfda74509720..2779340e861b 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -42,6 +42,9 @@ object MimaExcludes {
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.api.python.BasePythonRunner#ReaderIterator.this"),
// [SPARK-44198][CORE] Support propagation of the log level to the
executors
ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages$SparkAppConfig$"),
+ //[SPARK-46399][Core] Add exit status to the Application End event for the
use of Spark Listener
+
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.scheduler.SparkListenerApplicationEnd.*"),
+
ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.scheduler.SparkListenerApplicationEnd$"),
// [SPARK-45427][CORE] Add RPC SSL settings to SSLOptions and
SparkTransportConf
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.network.netty.SparkTransportConf.fromSparkConf"),
// [SPARK-45136][CONNECT] Enhance ClosureCleaner with Ammonite support
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]