This is an automated email from the ASF dual-hosted git repository.
yao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 43f79326106 [SPARK-46480][CORE][SQL] Fix NPE when table cache task
attempt
43f79326106 is described below
commit 43f79326106acb277b9edfb28c34f5dc310b416b
Author: ulysses-you <[email protected]>
AuthorDate: Fri Dec 22 13:26:02 2023 +0800
[SPARK-46480][CORE][SQL] Fix NPE when table cache task attempt
### What changes were proposed in this pull request?
This pr adds a check: we only mark the cached partition is materialized if
the task is not failed and not interrupted. And adds a new method `isFailed` in
`TaskContext`.
### Why are the changes needed?
Before this pr, when do cache, task failure can cause NPE in other tasks
```
java.lang.NullPointerException
at java.nio.ByteBuffer.wrap(ByteBuffer.java:396)
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificColumnarIterator.accessors1$(Unknown
Source)
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificColumnarIterator.hasNext(Unknown
Source)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
at
org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:155)
at
org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
at org.apache.spark.scheduler.Task.run(Task.scala:131)
at
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
```
### Does this PR introduce _any_ user-facing change?
yes, it's a bug fix
### How was this patch tested?
add test
### Was this patch authored or co-authored using generative AI tooling?
no
Closes #44445 from ulysses-you/fix-cache.
Authored-by: ulysses-you <[email protected]>
Signed-off-by: Kent Yao <[email protected]>
---
core/src/main/scala/org/apache/spark/BarrierTaskContext.scala | 2 ++
core/src/main/scala/org/apache/spark/TaskContext.scala | 5 +++++
core/src/main/scala/org/apache/spark/TaskContextImpl.scala | 2 ++
.../scala/org/apache/spark/scheduler/TaskContextSuite.scala | 10 ++++++++++
project/MimaExcludes.scala | 4 +++-
.../apache/spark/sql/execution/columnar/InMemoryRelation.scala | 8 +++++---
6 files changed, 27 insertions(+), 4 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/BarrierTaskContext.scala
b/core/src/main/scala/org/apache/spark/BarrierTaskContext.scala
index 0f9abaf94ae..50aff8b0fb1 100644
--- a/core/src/main/scala/org/apache/spark/BarrierTaskContext.scala
+++ b/core/src/main/scala/org/apache/spark/BarrierTaskContext.scala
@@ -194,6 +194,8 @@ class BarrierTaskContext private[spark] (
override def isCompleted(): Boolean = taskContext.isCompleted()
+ override def isFailed(): Boolean = taskContext.isFailed()
+
override def isInterrupted(): Boolean = taskContext.isInterrupted()
override def addTaskCompletionListener(listener: TaskCompletionListener):
this.type = {
diff --git a/core/src/main/scala/org/apache/spark/TaskContext.scala
b/core/src/main/scala/org/apache/spark/TaskContext.scala
index 0f8a10d734b..15ddd08fb4a 100644
--- a/core/src/main/scala/org/apache/spark/TaskContext.scala
+++ b/core/src/main/scala/org/apache/spark/TaskContext.scala
@@ -94,6 +94,11 @@ abstract class TaskContext extends Serializable {
*/
def isCompleted(): Boolean
+ /**
+ * Returns true if the task has failed.
+ */
+ def isFailed(): Boolean
+
/**
* Returns true if the task has been killed.
*/
diff --git a/core/src/main/scala/org/apache/spark/TaskContextImpl.scala
b/core/src/main/scala/org/apache/spark/TaskContextImpl.scala
index 8d2c2ab9bc4..a3c36de1515 100644
--- a/core/src/main/scala/org/apache/spark/TaskContextImpl.scala
+++ b/core/src/main/scala/org/apache/spark/TaskContextImpl.scala
@@ -275,6 +275,8 @@ private[spark] class TaskContextImpl(
@GuardedBy("this")
override def isCompleted(): Boolean = synchronized(completed)
+ override def isFailed(): Boolean = synchronized(failureCauseOpt.isDefined)
+
override def isInterrupted(): Boolean = reasonIfKilled.isDefined
override def getLocalProperty(key: String): String =
localProperties.getProperty(key)
diff --git
a/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala
b/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala
index c56fd3fd1f5..9aba41cea21 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala
@@ -670,6 +670,16 @@ class TaskContextSuite extends SparkFunSuite with
BeforeAndAfter with LocalSpark
assert(invocationOrder === Seq("C", "B", "A", "D"))
}
+ test("SPARK-46480: Add isFailed in TaskContext") {
+ val context = TaskContext.empty()
+ var isFailed = false
+ context.addTaskCompletionListener[Unit] { context =>
+ isFailed = context.isFailed()
+ }
+ context.markTaskFailed(new RuntimeException())
+ context.markTaskCompleted(None)
+ assert(isFailed)
+ }
}
private object TaskContextSuite {
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 2779340e861..eb4c130cc6a 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -57,7 +57,9 @@ object MimaExcludes {
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.sql.types.Decimal.fromStringANSI$default$3"),
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.sql.types.Decimal.fromStringANSI"),
// [SPARK-45762][CORE] Support shuffle managers defined in user jars by
changing startup order
-
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.SparkEnv.this")
+
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.SparkEnv.this"),
+ // [SPARK-46480][CORE][SQL] Fix NPE when table cache task attempt
+
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.TaskContext.isFailed")
)
// Default exclude rules
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
index af958208afd..c016fd52b61 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
@@ -285,9 +285,11 @@ case class CachedRDDBuilder(
cachedPlan.conf)
}
val cached = cb.mapPartitionsInternal { it =>
- TaskContext.get().addTaskCompletionListener[Unit](_ => {
- materializedPartitions.add(1L)
- })
+ TaskContext.get().addTaskCompletionListener[Unit] { context =>
+ if (!context.isFailed() && !context.isInterrupted()) {
+ materializedPartitions.add(1L)
+ }
+ }
new Iterator[CachedBatch] {
override def hasNext: Boolean = it.hasNext
override def next(): CachedBatch = {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]