This is an automated email from the ASF dual-hosted git repository.
yao pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.3 by this push:
new 8cd3c1a9c1c [SPARK-45568][TESTS] Fix flaky
WholeStageCodegenSparkSubmitSuite
8cd3c1a9c1c is described below
commit 8cd3c1a9c1c336155fe09728171aba84ef55ef2d
Author: Kent Yao <[email protected]>
AuthorDate: Tue Oct 17 22:19:18 2023 +0800
[SPARK-45568][TESTS] Fix flaky WholeStageCodegenSparkSubmitSuite
### What changes were proposed in this pull request?
WholeStageCodegenSparkSubmitSuite is
[flaky](https://github.com/apache/spark/actions/runs/6479534195/job/17593342589)
because SHUFFLE_PARTITIONS(200) creates 200 reducers for one total core and
improper stop progress causes executor launcher reties. The heavy load and
reties might result in timeout test failures.
### Why are the changes needed?
CI robustness
### Does this PR introduce _any_ user-facing change?
no
### How was this patch tested?
existing WholeStageCodegenSparkSubmitSuite
### Was this patch authored or co-authored using generative AI tooling?
no
Closes #43394 from yaooqinn/SPARK-45568.
Authored-by: Kent Yao <[email protected]>
Signed-off-by: Kent Yao <[email protected]>
(cherry picked from commit f00ec39542a5f9ac75d8c24f0f04a7be703c8d7c)
Signed-off-by: Kent Yao <[email protected]>
---
.../WholeStageCodegenSparkSubmitSuite.scala | 57 ++++++++++++----------
1 file changed, 30 insertions(+), 27 deletions(-)
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSparkSubmitSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSparkSubmitSuite.scala
index 73c4e4c3e1e..06ba8fb772a 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSparkSubmitSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSparkSubmitSuite.scala
@@ -26,6 +26,7 @@ import org.apache.spark.deploy.SparkSubmitTestUtils
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{QueryTest, Row, SparkSession}
import org.apache.spark.sql.functions.{array, col, count, lit}
+import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.IntegerType
import org.apache.spark.unsafe.Platform
import org.apache.spark.util.ResetSystemProperties
@@ -68,39 +69,41 @@ class WholeStageCodegenSparkSubmitSuite extends
SparkSubmitTestUtils
object WholeStageCodegenSparkSubmitSuite extends Assertions with Logging {
- var spark: SparkSession = _
-
def main(args: Array[String]): Unit = {
TestUtils.configTestLog4j2("INFO")
- spark = SparkSession.builder().getOrCreate()
+ val spark = SparkSession.builder()
+ .config(SQLConf.SHUFFLE_PARTITIONS.key, "2")
+ .getOrCreate()
+
+ try {
+ // Make sure the test is run where the driver and the executors uses
different object layouts
+ val driverArrayHeaderSize = Platform.BYTE_ARRAY_OFFSET
+ val executorArrayHeaderSize =
+ spark.sparkContext.range(0, 1).map(_ =>
Platform.BYTE_ARRAY_OFFSET).collect().head
+ assert(driverArrayHeaderSize > executorArrayHeaderSize)
- // Make sure the test is run where the driver and the executors uses
different object layouts
- val driverArrayHeaderSize = Platform.BYTE_ARRAY_OFFSET
- val executorArrayHeaderSize =
- spark.sparkContext.range(0, 1).map(_ =>
Platform.BYTE_ARRAY_OFFSET).collect.head.toInt
- assert(driverArrayHeaderSize > executorArrayHeaderSize)
+ val df = spark.range(71773).select((col("id") %
lit(10)).cast(IntegerType) as "v")
+ .groupBy(array(col("v"))).agg(count(col("*")))
+ val plan = df.queryExecution.executedPlan
+ assert(plan.exists(_.isInstanceOf[WholeStageCodegenExec]))
- val df = spark.range(71773).select((col("id") % lit(10)).cast(IntegerType)
as "v")
- .groupBy(array(col("v"))).agg(count(col("*")))
- val plan = df.queryExecution.executedPlan
- assert(plan.exists(_.isInstanceOf[WholeStageCodegenExec]))
+ val expectedAnswer =
+ Row(Array(0), 7178) ::
+ Row(Array(1), 7178) ::
+ Row(Array(2), 7178) ::
+ Row(Array(3), 7177) ::
+ Row(Array(4), 7177) ::
+ Row(Array(5), 7177) ::
+ Row(Array(6), 7177) ::
+ Row(Array(7), 7177) ::
+ Row(Array(8), 7177) ::
+ Row(Array(9), 7177) :: Nil
- val expectedAnswer =
- Row(Array(0), 7178) ::
- Row(Array(1), 7178) ::
- Row(Array(2), 7178) ::
- Row(Array(3), 7177) ::
- Row(Array(4), 7177) ::
- Row(Array(5), 7177) ::
- Row(Array(6), 7177) ::
- Row(Array(7), 7177) ::
- Row(Array(8), 7177) ::
- Row(Array(9), 7177) :: Nil
- val result = df.collect
- QueryTest.sameRows(result.toSeq, expectedAnswer) match {
- case Some(errMsg) => fail(errMsg)
- case _ =>
+ QueryTest.checkAnswer(df, expectedAnswer)
+ } finally {
+ spark.stop()
}
+
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]