This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 8d779272cea [MINOR] Move `spark.stage.maxConsecutiveAttempts` to config
8d779272cea is described below
commit 8d779272cea9ec7e3e75aa06f60f0968f1c5ea9e
Author: sychen <[email protected]>
AuthorDate: Wed Jul 19 11:27:26 2023 +0900
[MINOR] Move `spark.stage.maxConsecutiveAttempts` to config
### What changes were proposed in this pull request?
Move `spark.stage.maxConsecutiveAttempts` to
`org.apache.spark.internal.config`.
### Why are the changes needed?
Avoid using the `spark.stage.maxConsecutiveAttempts` constant string.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
exist UT
Closes #42061 from cxzl25/minor_maxConsecutiveAttempts_config.
Authored-by: sychen <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
---
.../main/scala/org/apache/spark/internal/config/package.scala | 11 +++++++++--
.../main/scala/org/apache/spark/scheduler/DAGScheduler.scala | 11 ++++-------
core/src/test/scala/org/apache/spark/ShuffleSuite.scala | 2 +-
3 files changed, 14 insertions(+), 10 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala
b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index 533562af05a..04eba8bddeb 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -2255,10 +2255,17 @@ package object config {
.checkValue(_ >= 0, "needs to be a non-negative value")
.createWithDefault(5)
+ private[spark] val STAGE_MAX_CONSECUTIVE_ATTEMPTS =
+ ConfigBuilder("spark.stage.maxConsecutiveAttempts")
+ .doc("Number of consecutive stage attempts allowed before a stage is
aborted.")
+ .version("2.2.0")
+ .intConf
+ .createWithDefault(4)
+
private[spark] val STAGE_IGNORE_DECOMMISSION_FETCH_FAILURE =
ConfigBuilder("spark.stage.ignoreDecommissionFetchFailure")
.doc("Whether ignore stage fetch failure caused by executor decommission
when " +
- "count spark.stage.maxConsecutiveAttempts")
+ s"count ${STAGE_MAX_CONSECUTIVE_ATTEMPTS.key}")
.version("3.4.0")
.booleanConf
.createWithDefault(false)
@@ -2527,7 +2534,7 @@ package object config {
.doc("Specify the max attempts for a stage - the spark job will be
aborted if any of its " +
"stages is resubmitted multiple times beyond the max retries
limitation. The maximum " +
"number of stage retries is the maximum of `spark.stage.maxAttempts`
and " +
- "`spark.stage.maxConsecutiveAttempts`.")
+ s"`${STAGE_MAX_CONSECUTIVE_ATTEMPTS.key}`.")
.version("3.5.0")
.intConf
.createWithDefault(Int.MaxValue)
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index fc83439454d..8a1480fd210 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -229,8 +229,7 @@ private[spark] class DAGScheduler(
* Number of consecutive stage attempts allowed before a stage is aborted.
*/
private[scheduler] val maxConsecutiveStageAttempts =
- sc.getConf.getInt("spark.stage.maxConsecutiveAttempts",
- DAGScheduler.DEFAULT_MAX_CONSECUTIVE_STAGE_ATTEMPTS)
+ sc.getConf.get(config.STAGE_MAX_CONSECUTIVE_ATTEMPTS)
/**
* Max stage attempts allowed before a stage is aborted.
@@ -1387,7 +1386,8 @@ private[spark] class DAGScheduler(
if (stage.getNextAttemptId >= maxStageAttempts) {
val reason = s"$stage (name=${stage.name}) has been resubmitted for
the maximum " +
s"allowable number of times: ${maxStageAttempts}, which is the max
value of " +
- s"config `spark.stage.maxAttempts` and
`spark.stage.maxConsecutiveAttempts`."
+ s"config `${config.STAGE_MAX_ATTEMPTS.key}` and " +
+ s"`${config.STAGE_MAX_CONSECUTIVE_ATTEMPTS.key}`."
abortStage(stage, reason, None)
} else {
val missing = getMissingParentStages(stage).sortBy(_.id)
@@ -1941,7 +1941,7 @@ private[spark] class DAGScheduler(
isExecutorDecommissioningOrDecommissioned(taskScheduler, bmAddress)
if (ignoreStageFailure) {
logInfo(s"Ignoring fetch failure from $task of $failedStage
attempt " +
- s"${task.stageAttemptId} when count
spark.stage.maxConsecutiveAttempts " +
+ s"${task.stageAttemptId} when count
${config.STAGE_MAX_CONSECUTIVE_ATTEMPTS.key} " +
s"as executor ${bmAddress.executorId} is decommissioned and " +
s" ${config.STAGE_IGNORE_DECOMMISSION_FETCH_FAILURE.key}=true")
} else {
@@ -3081,7 +3081,4 @@ private[spark] object DAGScheduler {
// this is a simplistic way to avoid resubmitting tasks in the non-fetchable
map stage one by one
// as more failure events come in
val RESUBMIT_TIMEOUT = 200
-
- // Number of consecutive stage attempts allowed before a stage is aborted
- val DEFAULT_MAX_CONSECUTIVE_STAGE_ATTEMPTS = 4
}
diff --git a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
index 6022d49a0b5..e4e6ec45a97 100644
--- a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
@@ -452,7 +452,7 @@ abstract class ShuffleSuite extends SparkFunSuite with
Matchers with LocalRootDi
val newConf = conf.clone
.set(config.SHUFFLE_CHECKSUM_ENABLED, true)
.set(TEST_NO_STAGE_RETRY, false)
- .set("spark.stage.maxConsecutiveAttempts", "1")
+ .set(config.STAGE_MAX_CONSECUTIVE_ATTEMPTS, 1)
sc = new SparkContext("local-cluster[2, 1, 2048]", "test", newConf)
val rdd = sc.parallelize(1 to 10, 2).map((_, 1)).reduceByKey(_ + _)
// materialize the shuffle map outputs
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]