This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 5b47841  [SPARK-28208][SQL][FOLLOWUP] Use `tryWithResource` pattern
5b47841 is described below

commit 5b478416f8e3fe2f015af1b6c8faa7fe9f15c05d
Author: Dongjoon Hyun <[email protected]>
AuthorDate: Thu Sep 19 15:33:12 2019 -0700

    [SPARK-28208][SQL][FOLLOWUP] Use `tryWithResource` pattern
    
    ### What changes were proposed in this pull request?
    
    This PR aims to use `tryWithResource` for ORC file.
    
    ### Why are the changes needed?
    
    This is a follow-up to address 
https://github.com/apache/spark/pull/25006#discussion_r298788206 .
    
    ### Does this PR introduce any user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Pass the Jenkins with the existing tests.
    
    Closes #25842 from dongjoon-hyun/SPARK-28208.
    
    Authored-by: Dongjoon Hyun <[email protected]>
    Signed-off-by: Dongjoon Hyun <[email protected]>
---
 .../execution/datasources/orc/OrcFileFormat.scala  | 13 ++++----
 .../sql/execution/datasources/orc/OrcUtils.scala   |  8 ++---
 .../v2/orc/OrcPartitionReaderFactory.scala         | 22 ++++++-------
 .../execution/datasources/orc/OrcQuerySuite.scala  | 36 +++++++++++-----------
 .../execution/datasources/orc/OrcSourceSuite.scala |  8 ++---
 .../spark/sql/hive/execution/HiveDDLSuite.scala    | 10 +++---
 6 files changed, 48 insertions(+), 49 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
index 59ee63ae..12c81a1 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
@@ -38,10 +38,9 @@ import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
 import 
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
 import org.apache.spark.sql.execution.datasources._
-import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.types._
-import org.apache.spark.util.SerializableConfiguration
+import org.apache.spark.util.{SerializableConfiguration, Utils}
 
 private[sql] object OrcFileFormat {
   private def checkFieldName(name: String): Unit = {
@@ -180,11 +179,11 @@ class OrcFileFormat
 
       val fs = filePath.getFileSystem(conf)
       val readerOptions = OrcFile.readerOptions(conf).filesystem(fs)
-      val reader = OrcFile.createReader(filePath, readerOptions)
-
-      val requestedColIdsOrEmptyFile = OrcUtils.requestedColumnIds(
-        isCaseSensitive, dataSchema, requiredSchema, reader, conf)
-      reader.close()
+      val requestedColIdsOrEmptyFile =
+        Utils.tryWithResource(OrcFile.createReader(filePath, readerOptions)) { 
reader =>
+          OrcUtils.requestedColumnIds(
+            isCaseSensitive, dataSchema, requiredSchema, reader, conf)
+        }
 
       if (requestedColIdsOrEmptyFile.isEmpty) {
         Iterator.empty
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala
index f3c92f3..eea9b2a 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala
@@ -35,7 +35,7 @@ import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
 import org.apache.spark.sql.catalyst.util.quoteIdentifier
 import org.apache.spark.sql.execution.datasources.SchemaMergeUtils
 import org.apache.spark.sql.types._
-import org.apache.spark.util.{SerializableConfiguration, ThreadUtils}
+import org.apache.spark.util.{ThreadUtils, Utils}
 
 object OrcUtils extends Logging {
 
@@ -62,9 +62,9 @@ object OrcUtils extends Logging {
     val fs = file.getFileSystem(conf)
     val readerOptions = OrcFile.readerOptions(conf).filesystem(fs)
     try {
-      val reader = OrcFile.createReader(file, readerOptions)
-      val schema = reader.getSchema
-      reader.close()
+      val schema = Utils.tryWithResource(OrcFile.createReader(file, 
readerOptions)) { reader =>
+        reader.getSchema
+      }
       if (schema.getFieldNames.size == 0) {
         None
       } else {
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcPartitionReaderFactory.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcPartitionReaderFactory.scala
index 7780ce4..03d58fd 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcPartitionReaderFactory.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcPartitionReaderFactory.scala
@@ -36,7 +36,7 @@ import org.apache.spark.sql.execution.datasources.v2._
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types.{AtomicType, StructType}
 import org.apache.spark.sql.vectorized.ColumnarBatch
-import org.apache.spark.util.SerializableConfiguration
+import org.apache.spark.util.{SerializableConfiguration, Utils}
 
 /**
  * A factory used to create Orc readers.
@@ -74,11 +74,11 @@ case class OrcPartitionReaderFactory(
 
     val fs = filePath.getFileSystem(conf)
     val readerOptions = OrcFile.readerOptions(conf).filesystem(fs)
-    val reader = OrcFile.createReader(filePath, readerOptions)
-
-    val requestedColIdsOrEmptyFile = OrcUtils.requestedColumnIds(
-      isCaseSensitive, dataSchema, readDataSchema, reader, conf)
-    reader.close()
+    val requestedColIdsOrEmptyFile =
+      Utils.tryWithResource(OrcFile.createReader(filePath, readerOptions)) { 
reader =>
+        OrcUtils.requestedColumnIds(
+          isCaseSensitive, dataSchema, readDataSchema, reader, conf)
+      }
 
     if (requestedColIdsOrEmptyFile.isEmpty) {
       new EmptyPartitionReader[InternalRow]
@@ -120,11 +120,11 @@ case class OrcPartitionReaderFactory(
 
     val fs = filePath.getFileSystem(conf)
     val readerOptions = OrcFile.readerOptions(conf).filesystem(fs)
-    val reader = OrcFile.createReader(filePath, readerOptions)
-
-    val requestedColIdsOrEmptyFile = OrcUtils.requestedColumnIds(
-      isCaseSensitive, dataSchema, readDataSchema, reader, conf)
-    reader.close()
+    val requestedColIdsOrEmptyFile =
+      Utils.tryWithResource(OrcFile.createReader(filePath, readerOptions)) { 
reader =>
+        OrcUtils.requestedColumnIds(
+          isCaseSensitive, dataSchema, readDataSchema, reader, conf)
+      }
 
     if (requestedColIdsOrEmptyFile.isEmpty) {
       new EmptyPartitionReader
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala
index c334d01..b8bf4b1 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala
@@ -194,9 +194,9 @@ abstract class OrcQueryTest extends OrcTest {
 
       val orcFilePath = new Path(maybeOrcFile.get.getAbsolutePath)
       val conf = OrcFile.readerOptions(new Configuration())
-      val reader = OrcFile.createReader(orcFilePath, conf)
-      assert("ZLIB" === reader.getCompressionKind.name)
-      reader.close()
+      Utils.tryWithResource(OrcFile.createReader(orcFilePath, conf)) { reader 
=>
+        assert("ZLIB" === reader.getCompressionKind.name)
+      }
     }
 
     // `compression` overrides `orc.compress`.
@@ -211,9 +211,9 @@ abstract class OrcQueryTest extends OrcTest {
 
       val orcFilePath = new Path(maybeOrcFile.get.getAbsolutePath)
       val conf = OrcFile.readerOptions(new Configuration())
-      val reader = OrcFile.createReader(orcFilePath, conf)
-      assert("ZLIB" === reader.getCompressionKind.name)
-      reader.close()
+      Utils.tryWithResource(OrcFile.createReader(orcFilePath, conf)) { reader 
=>
+        assert("ZLIB" === reader.getCompressionKind.name)
+      }
     }
   }
 
@@ -229,9 +229,9 @@ abstract class OrcQueryTest extends OrcTest {
 
       val orcFilePath = new Path(maybeOrcFile.get.getAbsolutePath)
       val conf = OrcFile.readerOptions(new Configuration())
-      val reader = OrcFile.createReader(orcFilePath, conf)
-      assert("ZLIB" === reader.getCompressionKind.name)
-      reader.close()
+      Utils.tryWithResource(OrcFile.createReader(orcFilePath, conf)) { reader 
=>
+        assert("ZLIB" === reader.getCompressionKind.name)
+      }
     }
 
     withTempPath { file =>
@@ -244,9 +244,9 @@ abstract class OrcQueryTest extends OrcTest {
 
       val orcFilePath = new Path(maybeOrcFile.get.getAbsolutePath)
       val conf = OrcFile.readerOptions(new Configuration())
-      val reader = OrcFile.createReader(orcFilePath, conf)
-      assert("SNAPPY" === reader.getCompressionKind.name)
-      reader.close()
+      Utils.tryWithResource(OrcFile.createReader(orcFilePath, conf)) { reader 
=>
+        assert("SNAPPY" === reader.getCompressionKind.name)
+      }
     }
 
     withTempPath { file =>
@@ -259,9 +259,9 @@ abstract class OrcQueryTest extends OrcTest {
 
       val orcFilePath = new Path(maybeOrcFile.get.getAbsolutePath)
       val conf = OrcFile.readerOptions(new Configuration())
-      val reader = OrcFile.createReader(orcFilePath, conf)
-      assert("NONE" === reader.getCompressionKind.name)
-      reader.close()
+      Utils.tryWithResource(OrcFile.createReader(orcFilePath, conf)) { reader 
=>
+        assert("NONE" === reader.getCompressionKind.name)
+      }
     }
   }
 
@@ -645,9 +645,9 @@ class OrcQuerySuite extends OrcQueryTest with 
SharedSparkSession {
 
       val orcFilePath = new Path(maybeOrcFile.get.getAbsolutePath)
       val conf = OrcFile.readerOptions(new Configuration())
-      val reader = OrcFile.createReader(orcFilePath, conf)
-      assert("LZO" === reader.getCompressionKind.name)
-      reader.close()
+      Utils.tryWithResource(OrcFile.createReader(orcFilePath, conf)) { reader 
=>
+        assert("LZO" === reader.getCompressionKind.name)
+      }
     }
   }
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala
index 70d15bb..a2d96dd 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala
@@ -372,10 +372,10 @@ abstract class OrcSuite extends OrcTest with 
BeforeAndAfterAll {
 
       val orcFilePath = new Path(partFiles.head.getAbsolutePath)
       val readerOptions = OrcFile.readerOptions(new Configuration())
-      val reader = OrcFile.createReader(orcFilePath, readerOptions)
-      val version = 
UTF_8.decode(reader.getMetadataValue(SPARK_VERSION_METADATA_KEY)).toString
-      reader.close()
-      assert(version === SPARK_VERSION_SHORT)
+      Utils.tryWithResource(OrcFile.createReader(orcFilePath, readerOptions)) 
{ reader =>
+        val version = 
UTF_8.decode(reader.getMetadataValue(SPARK_VERSION_METADATA_KEY)).toString
+        assert(version === SPARK_VERSION_SHORT)
+      }
     }
   }
 
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
index 487e888..4253fe2 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
@@ -2366,11 +2366,11 @@ class HiveDDLSuite
             checkAnswer(spark.table("t"), Row(1))
             val maybeFile = path.listFiles().find(_.getName.startsWith("part"))
 
-            val reader = getReader(maybeFile.head.getCanonicalPath)
-            assert(reader.getCompressionKind.name === "ZLIB")
-            assert(reader.getCompressionSize == 1001)
-            assert(reader.getRowIndexStride == 2002)
-            reader.close()
+            Utils.tryWithResource(getReader(maybeFile.head.getCanonicalPath)) 
{ reader =>
+              assert(reader.getCompressionKind.name === "ZLIB")
+              assert(reader.getCompressionSize == 1001)
+              assert(reader.getRowIndexStride == 2002)
+            }
           }
         }
       }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to