This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 8d779272cea [MINOR] Move `spark.stage.maxConsecutiveAttempts` to config
8d779272cea is described below

commit 8d779272cea9ec7e3e75aa06f60f0968f1c5ea9e
Author: sychen <[email protected]>
AuthorDate: Wed Jul 19 11:27:26 2023 +0900

    [MINOR] Move `spark.stage.maxConsecutiveAttempts` to config
    
    ### What changes were proposed in this pull request?
    Move `spark.stage.maxConsecutiveAttempts` to 
`org.apache.spark.internal.config`.
    
    ### Why are the changes needed?
    Avoid using the `spark.stage.maxConsecutiveAttempts` constant string.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    exist UT
    
    Closes #42061 from cxzl25/minor_maxConsecutiveAttempts_config.
    
    Authored-by: sychen <[email protected]>
    Signed-off-by: Hyukjin Kwon <[email protected]>
---
 .../main/scala/org/apache/spark/internal/config/package.scala | 11 +++++++++--
 .../main/scala/org/apache/spark/scheduler/DAGScheduler.scala  | 11 ++++-------
 core/src/test/scala/org/apache/spark/ShuffleSuite.scala       |  2 +-
 3 files changed, 14 insertions(+), 10 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala 
b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index 533562af05a..04eba8bddeb 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -2255,10 +2255,17 @@ package object config {
       .checkValue(_ >= 0, "needs to be a non-negative value")
       .createWithDefault(5)
 
+  private[spark] val STAGE_MAX_CONSECUTIVE_ATTEMPTS =
+    ConfigBuilder("spark.stage.maxConsecutiveAttempts")
+      .doc("Number of consecutive stage attempts allowed before a stage is 
aborted.")
+      .version("2.2.0")
+      .intConf
+      .createWithDefault(4)
+
   private[spark] val STAGE_IGNORE_DECOMMISSION_FETCH_FAILURE =
     ConfigBuilder("spark.stage.ignoreDecommissionFetchFailure")
       .doc("Whether ignore stage fetch failure caused by executor decommission 
when " +
-        "count spark.stage.maxConsecutiveAttempts")
+        s"count ${STAGE_MAX_CONSECUTIVE_ATTEMPTS.key}")
       .version("3.4.0")
       .booleanConf
       .createWithDefault(false)
@@ -2527,7 +2534,7 @@ package object config {
       .doc("Specify the max attempts for a stage - the spark job will be 
aborted if any of its " +
         "stages is resubmitted multiple times beyond the max retries 
limitation. The maximum " +
         "number of stage retries is the maximum of `spark.stage.maxAttempts` 
and " +
-        "`spark.stage.maxConsecutiveAttempts`.")
+        s"`${STAGE_MAX_CONSECUTIVE_ATTEMPTS.key}`.")
       .version("3.5.0")
       .intConf
       .createWithDefault(Int.MaxValue)
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index fc83439454d..8a1480fd210 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -229,8 +229,7 @@ private[spark] class DAGScheduler(
    * Number of consecutive stage attempts allowed before a stage is aborted.
    */
   private[scheduler] val maxConsecutiveStageAttempts =
-    sc.getConf.getInt("spark.stage.maxConsecutiveAttempts",
-      DAGScheduler.DEFAULT_MAX_CONSECUTIVE_STAGE_ATTEMPTS)
+    sc.getConf.get(config.STAGE_MAX_CONSECUTIVE_ATTEMPTS)
 
   /**
    * Max stage attempts allowed before a stage is aborted.
@@ -1387,7 +1386,8 @@ private[spark] class DAGScheduler(
         if (stage.getNextAttemptId >= maxStageAttempts) {
           val reason = s"$stage (name=${stage.name}) has been resubmitted for 
the maximum " +
             s"allowable number of times: ${maxStageAttempts}, which is the max 
value of " +
-            s"config `spark.stage.maxAttempts` and 
`spark.stage.maxConsecutiveAttempts`."
+            s"config `${config.STAGE_MAX_ATTEMPTS.key}` and " +
+            s"`${config.STAGE_MAX_CONSECUTIVE_ATTEMPTS.key}`."
           abortStage(stage, reason, None)
         } else {
           val missing = getMissingParentStages(stage).sortBy(_.id)
@@ -1941,7 +1941,7 @@ private[spark] class DAGScheduler(
             isExecutorDecommissioningOrDecommissioned(taskScheduler, bmAddress)
           if (ignoreStageFailure) {
             logInfo(s"Ignoring fetch failure from $task of $failedStage 
attempt " +
-              s"${task.stageAttemptId} when count 
spark.stage.maxConsecutiveAttempts " +
+              s"${task.stageAttemptId} when count 
${config.STAGE_MAX_CONSECUTIVE_ATTEMPTS.key} " +
               s"as executor ${bmAddress.executorId} is decommissioned and " +
               s" ${config.STAGE_IGNORE_DECOMMISSION_FETCH_FAILURE.key}=true")
           } else {
@@ -3081,7 +3081,4 @@ private[spark] object DAGScheduler {
   // this is a simplistic way to avoid resubmitting tasks in the non-fetchable 
map stage one by one
   // as more failure events come in
   val RESUBMIT_TIMEOUT = 200
-
-  // Number of consecutive stage attempts allowed before a stage is aborted
-  val DEFAULT_MAX_CONSECUTIVE_STAGE_ATTEMPTS = 4
 }
diff --git a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala 
b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
index 6022d49a0b5..e4e6ec45a97 100644
--- a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
@@ -452,7 +452,7 @@ abstract class ShuffleSuite extends SparkFunSuite with 
Matchers with LocalRootDi
     val newConf = conf.clone
       .set(config.SHUFFLE_CHECKSUM_ENABLED, true)
       .set(TEST_NO_STAGE_RETRY, false)
-      .set("spark.stage.maxConsecutiveAttempts", "1")
+      .set(config.STAGE_MAX_CONSECUTIVE_ATTEMPTS, 1)
     sc = new SparkContext("local-cluster[2, 1, 2048]", "test", newConf)
     val rdd = sc.parallelize(1 to 10, 2).map((_, 1)).reduceByKey(_ + _)
     // materialize the shuffle map outputs


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to