This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 563242b5180 [SPARK-43193][SS] Remove workaround for HADOOP-12074
563242b5180 is described below
commit 563242b518058f517292021148073ca47362b45d
Author: Cheng Pan <[email protected]>
AuthorDate: Fri Apr 21 13:01:30 2023 +0900
[SPARK-43193][SS] Remove workaround for HADOOP-12074
### What changes were proposed in this pull request?
SPARK-19718 introduced different code branches for pre-Hadoop 2.8(w/o
[HADOOP-12074](https://issues.apache.org/jira/browse/HADOOP-12074)) and Hadoop
2.8+(w/ [HADOOP-12074](https://issues.apache.org/jira/browse/HADOOP-12074))
> 1. Check if the message of IOException starts with
`java.lang.InterruptedException`. If so, treat it as `InterruptedException`.
This is for pre-Hadoop 2.8.
> 2. Treat `InterruptedIOException` as `InterruptedException`. This is for
Hadoop 2.8+ and other places that may throw `InterruptedIOException` when the
thread is interrupted.
This PR removes the (1) since Spark no longer supports Hadoop2 now.
### Why are the changes needed?
Since [SPARK-42452](https://issues.apache.org/jira/browse/SPARK-42452)
removed support for Hadoop2, we can remove the workaround code for pre Hadoop
2.8 now.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Pass GA.
Closes #40852 from pan3793/SPARK-43193.
Authored-by: Cheng Pan <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
---
.../sql/execution/streaming/StreamExecution.scala | 8 +---
.../apache/spark/sql/streaming/StreamSuite.scala | 55 ++--------------------
2 files changed, 5 insertions(+), 58 deletions(-)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index 6b993a414be..e5077fa7f7b 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -17,7 +17,7 @@
package org.apache.spark.sql.execution.streaming
-import java.io.{InterruptedIOException, IOException, UncheckedIOException}
+import java.io.{InterruptedIOException, UncheckedIOException}
import java.nio.channels.ClosedByInterruptException
import java.util.UUID
import java.util.concurrent.{CountDownLatch, ExecutionException,
TimeoutException, TimeUnit}
@@ -313,12 +313,6 @@ abstract class StreamExecution(
case e if isInterruptedByStop(e, sparkSession.sparkContext) =>
// interrupted by stop()
updateStatusMessage("Stopped")
- case e: IOException if e.getMessage != null
- && e.getMessage.startsWith(classOf[InterruptedException].getName)
- && state.get == TERMINATED =>
- // This is a workaround for HADOOP-12074: `Shell.runCommand` converts
`InterruptedException`
- // to `new IOException(ie.toString())` before Hadoop 2.8.
- updateStatusMessage("Stopped")
case e: Throwable =>
val message = if (e.getMessage == null) "" else e.getMessage
streamDeathCause = new StreamingQueryException(
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
index 62ef5824ed5..8c6402359bc 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
@@ -17,7 +17,7 @@
package org.apache.spark.sql.streaming
-import java.io.{File, InterruptedIOException, IOException,
UncheckedIOException}
+import java.io.{File, InterruptedIOException, UncheckedIOException}
import java.nio.channels.ClosedByInterruptException
import java.time.ZoneId
import java.util.concurrent.{CountDownLatch, ExecutionException, TimeUnit}
@@ -666,26 +666,8 @@ class StreamSuite extends StreamTest {
}
}
- test("handle IOException when the streaming thread is interrupted (pre
Hadoop 2.8)") {
- // This test uses a fake source to throw the same IOException as pre
Hadoop 2.8 when the
- // streaming thread is interrupted. We should handle it properly by not
failing the query.
- ThrowingIOExceptionLikeHadoop12074.createSourceLatch = new
CountDownLatch(1)
- val query = spark
- .readStream
- .format(classOf[ThrowingIOExceptionLikeHadoop12074].getName)
- .load()
- .writeStream
- .format("console")
- .start()
- assert(ThrowingIOExceptionLikeHadoop12074.createSourceLatch
- .await(streamingTimeout.toMillis, TimeUnit.MILLISECONDS),
- "ThrowingIOExceptionLikeHadoop12074.createSource wasn't called before
timeout")
- query.stop()
- assert(query.exception.isEmpty)
- }
-
- test("handle InterruptedIOException when the streaming thread is interrupted
(Hadoop 2.8+)") {
- // This test uses a fake source to throw the same InterruptedIOException
as Hadoop 2.8+ when the
+ test("handle InterruptedIOException when the streaming thread is
interrupted") {
+ // This test uses a fake source to throw the same InterruptedIOException
as Hadoop when the
// streaming thread is interrupted. We should handle it properly by not
failing the query.
ThrowingInterruptedIOException.createSourceLatch = new CountDownLatch(1)
val query = spark
@@ -1363,36 +1345,7 @@ class FakeDefaultSource extends FakeSource {
}
}
-/** A fake source that throws the same IOException like pre Hadoop 2.8 when
it's interrupted. */
-class ThrowingIOExceptionLikeHadoop12074 extends FakeSource {
- import ThrowingIOExceptionLikeHadoop12074._
-
- override def createSource(
- spark: SQLContext,
- metadataPath: String,
- schema: Option[StructType],
- providerName: String,
- parameters: Map[String, String]): Source = {
- createSourceLatch.countDown()
- try {
- Thread.sleep(30000)
- throw new TimeoutException("sleep was not interrupted in 30 seconds")
- } catch {
- case ie: InterruptedException =>
- throw new IOException(ie.toString)
- }
- }
-}
-
-object ThrowingIOExceptionLikeHadoop12074 {
- /**
- * A latch to allow the user to wait until
`ThrowingIOExceptionLikeHadoop12074.createSource` is
- * called.
- */
- @volatile var createSourceLatch: CountDownLatch = null
-}
-
-/** A fake source that throws InterruptedIOException like Hadoop 2.8+ when
it's interrupted. */
+/** A fake source that throws InterruptedIOException like Hadoop when it's
interrupted. */
class ThrowingInterruptedIOException extends FakeSource {
import ThrowingInterruptedIOException._
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]