This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 563242b5180 [SPARK-43193][SS] Remove workaround for HADOOP-12074
563242b5180 is described below

commit 563242b518058f517292021148073ca47362b45d
Author: Cheng Pan <[email protected]>
AuthorDate: Fri Apr 21 13:01:30 2023 +0900

    [SPARK-43193][SS] Remove workaround for HADOOP-12074
    
    ### What changes were proposed in this pull request?
    
    SPARK-19718 introduced different code branches for pre-Hadoop 2.8(w/o 
[HADOOP-12074](https://issues.apache.org/jira/browse/HADOOP-12074)) and Hadoop 
2.8+(w/ [HADOOP-12074](https://issues.apache.org/jira/browse/HADOOP-12074))
    
    > 1. Check if the message of IOException starts with 
`java.lang.InterruptedException`. If so, treat it as `InterruptedException`. 
This is for pre-Hadoop 2.8.
    > 2. Treat `InterruptedIOException` as `InterruptedException`. This is for 
Hadoop 2.8+ and other places that may throw `InterruptedIOException` when the 
thread is interrupted.
    
    This PR removes the (1) since Spark no longer supports Hadoop2 now.
    
    ### Why are the changes needed?
    
    Since [SPARK-42452](https://issues.apache.org/jira/browse/SPARK-42452) 
removed support for Hadoop2, we can remove the workaround code for pre Hadoop 
2.8 now.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Pass GA.
    
    Closes #40852 from pan3793/SPARK-43193.
    
    Authored-by: Cheng Pan <[email protected]>
    Signed-off-by: Hyukjin Kwon <[email protected]>
---
 .../sql/execution/streaming/StreamExecution.scala  |  8 +---
 .../apache/spark/sql/streaming/StreamSuite.scala   | 55 ++--------------------
 2 files changed, 5 insertions(+), 58 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index 6b993a414be..e5077fa7f7b 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql.execution.streaming
 
-import java.io.{InterruptedIOException, IOException, UncheckedIOException}
+import java.io.{InterruptedIOException, UncheckedIOException}
 import java.nio.channels.ClosedByInterruptException
 import java.util.UUID
 import java.util.concurrent.{CountDownLatch, ExecutionException, 
TimeoutException, TimeUnit}
@@ -313,12 +313,6 @@ abstract class StreamExecution(
       case e if isInterruptedByStop(e, sparkSession.sparkContext) =>
         // interrupted by stop()
         updateStatusMessage("Stopped")
-      case e: IOException if e.getMessage != null
-        && e.getMessage.startsWith(classOf[InterruptedException].getName)
-        && state.get == TERMINATED =>
-        // This is a workaround for HADOOP-12074: `Shell.runCommand` converts 
`InterruptedException`
-        // to `new IOException(ie.toString())` before Hadoop 2.8.
-        updateStatusMessage("Stopped")
       case e: Throwable =>
         val message = if (e.getMessage == null) "" else e.getMessage
         streamDeathCause = new StreamingQueryException(
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
index 62ef5824ed5..8c6402359bc 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql.streaming
 
-import java.io.{File, InterruptedIOException, IOException, 
UncheckedIOException}
+import java.io.{File, InterruptedIOException, UncheckedIOException}
 import java.nio.channels.ClosedByInterruptException
 import java.time.ZoneId
 import java.util.concurrent.{CountDownLatch, ExecutionException, TimeUnit}
@@ -666,26 +666,8 @@ class StreamSuite extends StreamTest {
     }
   }
 
-  test("handle IOException when the streaming thread is interrupted (pre 
Hadoop 2.8)") {
-    // This test uses a fake source to throw the same IOException as pre 
Hadoop 2.8 when the
-    // streaming thread is interrupted. We should handle it properly by not 
failing the query.
-    ThrowingIOExceptionLikeHadoop12074.createSourceLatch = new 
CountDownLatch(1)
-    val query = spark
-      .readStream
-      .format(classOf[ThrowingIOExceptionLikeHadoop12074].getName)
-      .load()
-      .writeStream
-      .format("console")
-      .start()
-    assert(ThrowingIOExceptionLikeHadoop12074.createSourceLatch
-      .await(streamingTimeout.toMillis, TimeUnit.MILLISECONDS),
-      "ThrowingIOExceptionLikeHadoop12074.createSource wasn't called before 
timeout")
-    query.stop()
-    assert(query.exception.isEmpty)
-  }
-
-  test("handle InterruptedIOException when the streaming thread is interrupted 
(Hadoop 2.8+)") {
-    // This test uses a fake source to throw the same InterruptedIOException 
as Hadoop 2.8+ when the
+  test("handle InterruptedIOException when the streaming thread is 
interrupted") {
+    // This test uses a fake source to throw the same InterruptedIOException 
as Hadoop when the
     // streaming thread is interrupted. We should handle it properly by not 
failing the query.
     ThrowingInterruptedIOException.createSourceLatch = new CountDownLatch(1)
     val query = spark
@@ -1363,36 +1345,7 @@ class FakeDefaultSource extends FakeSource {
   }
 }
 
-/** A fake source that throws the same IOException like pre Hadoop 2.8 when 
it's interrupted. */
-class ThrowingIOExceptionLikeHadoop12074 extends FakeSource {
-  import ThrowingIOExceptionLikeHadoop12074._
-
-  override def createSource(
-      spark: SQLContext,
-      metadataPath: String,
-      schema: Option[StructType],
-      providerName: String,
-      parameters: Map[String, String]): Source = {
-    createSourceLatch.countDown()
-    try {
-      Thread.sleep(30000)
-      throw new TimeoutException("sleep was not interrupted in 30 seconds")
-    } catch {
-      case ie: InterruptedException =>
-        throw new IOException(ie.toString)
-    }
-  }
-}
-
-object ThrowingIOExceptionLikeHadoop12074 {
-  /**
-   * A latch to allow the user to wait until 
`ThrowingIOExceptionLikeHadoop12074.createSource` is
-   * called.
-   */
-  @volatile var createSourceLatch: CountDownLatch = null
-}
-
-/** A fake source that throws InterruptedIOException like Hadoop 2.8+ when 
it's interrupted. */
+/** A fake source that throws InterruptedIOException like Hadoop when it's 
interrupted. */
 class ThrowingInterruptedIOException extends FakeSource {
   import ThrowingInterruptedIOException._
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to