This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 3ff0d998431 [SPARK-42112][SQL][SS] Add null check before 
`ContinuousWriteRDD#compute` function close `dataWriter`
3ff0d998431 is described below

commit 3ff0d998431df99ae3e2f5e1fce3125f5d039c52
Author: yangjie01 <[email protected]>
AuthorDate: Thu Jan 19 19:23:55 2023 -0800

    [SPARK-42112][SQL][SS] Add null check before `ContinuousWriteRDD#compute` 
function close `dataWriter`
    
    ### What changes were proposed in this pull request?
    This pr aims to add null check before `ContinuousWriteRDD#compute` function 
close `dataWriter` to avoid NPE.
    
    ### Why are the changes needed?
    Run the following commands:
    
    ```
    mvn clean test -pl sql/core -Dtest=none 
-DwildcardSuites=org.apache.spark.sql.streaming.continuous.ContinuousSuite -am
    ```
    
    All test passed, but there is a NPE in the test log:
    
    ```
    - repeatedly restart
    ...
    
    16:07:39.891 ERROR 
org.apache.spark.sql.execution.streaming.continuous.ContinuousWriteRDD: Writer 
for partition 1 is aborting.
    16:07:39.891 ERROR 
org.apache.spark.sql.execution.streaming.continuous.ContinuousWriteRDD: Writer 
for partition 1 aborted.
    16:07:39.892 WARN org.apache.spark.util.Utils: Suppressing exception in 
finally: null
    java.lang.NullPointerException
            at 
org.apache.spark.sql.execution.streaming.continuous.ContinuousWriteRDD.$anonfun$compute$7(ContinuousWriteRDD.scala:91)
            at 
org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1558)
            at 
org.apache.spark.sql.execution.streaming.continuous.ContinuousWriteRDD.compute(ContinuousWriteRDD.scala:91)
            at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
            at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
            at 
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92)
            at 
org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
            at org.apache.spark.scheduler.Task.run(Task.scala:139)
            at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
            at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1502)
            at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
            at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
            at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
            at java.lang.Thread.run(Thread.java:750)
    ```
    
    The test did not fail because `Utils.tryWithSafeFinallyAndFailureCallbacks` 
function suppressed Exception in `finally` block, but we should avoid this NPE 
being thrown.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    
    - Pass GitHub Actions
    - Run the following command manually, and there is no NPE-related log in 
the test log after this pr.
    
    ```
    mvn clean test -pl sql/core -Dtest=none 
-DwildcardSuites=org.apache.spark.sql.streaming.continuous.ContinuousSuite -am
    ```
    
    Closes #39650 from LuciferYang/SPARK-42112.
    
    Authored-by: yangjie01 <[email protected]>
    Signed-off-by: Dongjoon Hyun <[email protected]>
---
 .../spark/sql/execution/streaming/continuous/ContinuousWriteRDD.scala   | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousWriteRDD.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousWriteRDD.scala
index e2a1f412dcc..688b66716ea 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousWriteRDD.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousWriteRDD.scala
@@ -88,7 +88,7 @@ class ContinuousWriteRDD(var prev: RDD[InternalRow], 
writerFactory: StreamingDat
         if (dataWriter != null) dataWriter.abort()
         logError(s"Writer for partition ${context.partitionId()} aborted.")
       }, finallyBlock = {
-        dataWriter.close()
+        if (dataWriter != null) dataWriter.close()
       })
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to