This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 43852733307a [SPARK-45618][CORE] Remove BaseErrorHandler
43852733307a is described below

commit 43852733307a229944bd254f38bcc1f84bca97fd
Author: Liang-Chi Hsieh <[email protected]>
AuthorDate: Mon Oct 23 22:00:17 2023 -0700

    [SPARK-45618][CORE] Remove BaseErrorHandler
    
    ### What changes were proposed in this pull request?
    
    This patch removes a workaround trait `BaseErrorHandler` which was added 
long time ago (SPARK-25535) for 
[CRYPTO-141](https://issues.apache.org/jira/browse/CRYPTO-141) which was fixed 
5 years ago.
    
    ### Why are the changes needed?
    
    Removing unnecessary workaround code.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Existing tests.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #43468 from viirya/remove_workaround.
    
    Authored-by: Liang-Chi Hsieh <[email protected]>
    Signed-off-by: Dongjoon Hyun <[email protected]>
---
 .../apache/spark/security/CryptoStreamUtils.scala  | 137 ++-------------------
 .../spark/security/CryptoStreamUtilsSuite.scala    |  35 +-----
 2 files changed, 10 insertions(+), 162 deletions(-)

diff --git 
a/core/src/main/scala/org/apache/spark/security/CryptoStreamUtils.scala 
b/core/src/main/scala/org/apache/spark/security/CryptoStreamUtils.scala
index 20a4147f7d80..409223132a62 100644
--- a/core/src/main/scala/org/apache/spark/security/CryptoStreamUtils.scala
+++ b/core/src/main/scala/org/apache/spark/security/CryptoStreamUtils.scala
@@ -16,7 +16,7 @@
  */
 package org.apache.spark.security
 
-import java.io.{Closeable, InputStream, IOException, OutputStream}
+import java.io.{InputStream, OutputStream}
 import java.nio.ByteBuffer
 import java.nio.channels.{ReadableByteChannel, WritableByteChannel}
 import java.util.Properties
@@ -55,10 +55,8 @@ private[spark] object CryptoStreamUtils extends Logging {
     val params = new CryptoParams(key, sparkConf)
     val iv = createInitializationVector(params.conf)
     os.write(iv)
-    new ErrorHandlingOutputStream(
-      new CryptoOutputStream(params.transformation, params.conf, os, 
params.keySpec,
-        new IvParameterSpec(iv)),
-      os)
+    new CryptoOutputStream(params.transformation, params.conf, os, 
params.keySpec,
+      new IvParameterSpec(iv))
   }
 
   /**
@@ -73,10 +71,8 @@ private[spark] object CryptoStreamUtils extends Logging {
     val helper = new CryptoHelperChannel(channel)
 
     helper.write(ByteBuffer.wrap(iv))
-    new ErrorHandlingWritableChannel(
-      new CryptoOutputStream(params.transformation, params.conf, helper, 
params.keySpec,
-        new IvParameterSpec(iv)),
-      helper)
+    new CryptoOutputStream(params.transformation, params.conf, helper, 
params.keySpec,
+      new IvParameterSpec(iv))
   }
 
   /**
@@ -89,10 +85,8 @@ private[spark] object CryptoStreamUtils extends Logging {
     val iv = new Array[Byte](IV_LENGTH_IN_BYTES)
     ByteStreams.readFully(is, iv)
     val params = new CryptoParams(key, sparkConf)
-    new ErrorHandlingInputStream(
-      new CryptoInputStream(params.transformation, params.conf, is, 
params.keySpec,
-        new IvParameterSpec(iv)),
-      is)
+    new CryptoInputStream(params.transformation, params.conf, is, 
params.keySpec,
+      new IvParameterSpec(iv))
   }
 
   /**
@@ -107,10 +101,8 @@ private[spark] object CryptoStreamUtils extends Logging {
     JavaUtils.readFully(channel, buf)
 
     val params = new CryptoParams(key, sparkConf)
-    new ErrorHandlingReadableChannel(
-      new CryptoInputStream(params.transformation, params.conf, channel, 
params.keySpec,
-        new IvParameterSpec(iv)),
-      channel)
+    new CryptoInputStream(params.transformation, params.conf, channel, 
params.keySpec,
+      new IvParameterSpec(iv))
   }
 
   def toCryptoConf(conf: SparkConf): Properties = {
@@ -166,117 +158,6 @@ private[spark] object CryptoStreamUtils extends Logging {
 
   }
 
-  /**
-   * SPARK-25535. The commons-crypto library will throw InternalError if 
something goes
-   * wrong, and leave bad state behind in the Java wrappers, so it's not safe 
to use them
-   * afterwards. This wrapper detects that situation and avoids further calls 
into the
-   * commons-crypto code, while still allowing the underlying streams to be 
closed.
-   *
-   * This should be removed once CRYPTO-141 is fixed (and Spark upgrades its 
commons-crypto
-   * dependency).
-   */
-  trait BaseErrorHandler extends Closeable {
-
-    private var closed = false
-
-    /** The encrypted stream that may get into an unhealthy state. */
-    protected def cipherStream: Closeable
-
-    /**
-     * The underlying stream that is being wrapped by the encrypted stream, so 
that it can be
-     * closed even if there's an error in the crypto layer.
-     */
-    protected def original: Closeable
-
-    protected def safeCall[T](fn: => T): T = {
-      if (closed) {
-        throw new IOException("Cipher stream is closed.")
-      }
-      try {
-        fn
-      } catch {
-        case ie: InternalError =>
-          closed = true
-          original.close()
-          throw ie
-      }
-    }
-
-    override def close(): Unit = {
-      if (!closed) {
-        cipherStream.close()
-      }
-    }
-
-  }
-
-  // Visible for testing.
-  class ErrorHandlingReadableChannel(
-      protected val cipherStream: ReadableByteChannel,
-      protected val original: ReadableByteChannel)
-    extends ReadableByteChannel with BaseErrorHandler {
-
-    override def read(src: ByteBuffer): Int = safeCall {
-      cipherStream.read(src)
-    }
-
-    override def isOpen(): Boolean = cipherStream.isOpen()
-
-  }
-
-  private class ErrorHandlingInputStream(
-      protected val cipherStream: InputStream,
-      protected val original: InputStream)
-    extends InputStream with BaseErrorHandler {
-
-    override def read(b: Array[Byte]): Int = safeCall {
-      cipherStream.read(b)
-    }
-
-    override def read(b: Array[Byte], off: Int, len: Int): Int = safeCall {
-      cipherStream.read(b, off, len)
-    }
-
-    override def read(): Int = safeCall {
-      cipherStream.read()
-    }
-  }
-
-  private class ErrorHandlingWritableChannel(
-      protected val cipherStream: WritableByteChannel,
-      protected val original: WritableByteChannel)
-    extends WritableByteChannel with BaseErrorHandler {
-
-    override def write(src: ByteBuffer): Int = safeCall {
-      cipherStream.write(src)
-    }
-
-    override def isOpen(): Boolean = cipherStream.isOpen()
-
-  }
-
-  private class ErrorHandlingOutputStream(
-      protected val cipherStream: OutputStream,
-      protected val original: OutputStream)
-    extends OutputStream with BaseErrorHandler {
-
-    override def flush(): Unit = safeCall {
-      cipherStream.flush()
-    }
-
-    override def write(b: Array[Byte]): Unit = safeCall {
-      cipherStream.write(b)
-    }
-
-    override def write(b: Array[Byte], off: Int, len: Int): Unit = safeCall {
-      cipherStream.write(b, off, len)
-    }
-
-    override def write(b: Int): Unit = safeCall {
-      cipherStream.write(b)
-    }
-  }
-
   private class CryptoParams(key: Array[Byte], sparkConf: SparkConf) {
 
     val keySpec = new SecretKeySpec(key, "AES")
diff --git 
a/core/src/test/scala/org/apache/spark/security/CryptoStreamUtilsSuite.scala 
b/core/src/test/scala/org/apache/spark/security/CryptoStreamUtilsSuite.scala
index c3d96e7c42af..27a53e820520 100644
--- a/core/src/test/scala/org/apache/spark/security/CryptoStreamUtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/security/CryptoStreamUtilsSuite.scala
@@ -17,15 +17,12 @@
 package org.apache.spark.security
 
 import java.io._
-import java.nio.ByteBuffer
-import java.nio.channels.{Channels, ReadableByteChannel}
+import java.nio.channels.Channels
 import java.nio.charset.StandardCharsets.UTF_8
 import java.nio.file.Files
 import java.util.{Arrays, Random, UUID}
 
 import com.google.common.io.ByteStreams
-import org.mockito.ArgumentMatchers.any
-import org.mockito.Mockito._
 
 import org.apache.spark._
 import org.apache.spark.internal.config._
@@ -167,36 +164,6 @@ class CryptoStreamUtilsSuite extends SparkFunSuite {
     }
   }
 
-  test("error handling wrapper") {
-    val wrapped = mock(classOf[ReadableByteChannel])
-    val decrypted = mock(classOf[ReadableByteChannel])
-    val errorHandler = new 
CryptoStreamUtils.ErrorHandlingReadableChannel(decrypted, wrapped)
-
-    when(decrypted.read(any(classOf[ByteBuffer])))
-      .thenThrow(new IOException())
-      .thenThrow(new InternalError())
-      .thenReturn(1)
-
-    val out = ByteBuffer.allocate(1)
-    intercept[IOException] {
-      errorHandler.read(out)
-    }
-    intercept[InternalError] {
-      errorHandler.read(out)
-    }
-
-    val e = intercept[IOException] {
-      errorHandler.read(out)
-    }
-    assert(e.getMessage().contains("is closed"))
-    errorHandler.close()
-
-    verify(decrypted, times(2)).read(any(classOf[ByteBuffer]))
-    verify(wrapped, never()).read(any(classOf[ByteBuffer]))
-    verify(decrypted, never()).close()
-    verify(wrapped, times(1)).close()
-  }
-
   private def createConf(extra: (String, String)*): SparkConf = {
     val conf = new SparkConf()
     extra.foreach { case (k, v) => conf.set(k, v) }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to