This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 5b47841 [SPARK-28208][SQL][FOLLOWUP] Use `tryWithResource` pattern
5b47841 is described below
commit 5b478416f8e3fe2f015af1b6c8faa7fe9f15c05d
Author: Dongjoon Hyun <[email protected]>
AuthorDate: Thu Sep 19 15:33:12 2019 -0700
[SPARK-28208][SQL][FOLLOWUP] Use `tryWithResource` pattern
### What changes were proposed in this pull request?
This PR aims to use `tryWithResource` for ORC file.
### Why are the changes needed?
This is a follow-up to address
https://github.com/apache/spark/pull/25006#discussion_r298788206 .
### Does this PR introduce any user-facing change?
No.
### How was this patch tested?
Pass the Jenkins with the existing tests.
Closes #25842 from dongjoon-hyun/SPARK-28208.
Authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../execution/datasources/orc/OrcFileFormat.scala | 13 ++++----
.../sql/execution/datasources/orc/OrcUtils.scala | 8 ++---
.../v2/orc/OrcPartitionReaderFactory.scala | 22 ++++++-------
.../execution/datasources/orc/OrcQuerySuite.scala | 36 +++++++++++-----------
.../execution/datasources/orc/OrcSourceSuite.scala | 8 ++---
.../spark/sql/hive/execution/HiveDDLSuite.scala | 10 +++---
6 files changed, 48 insertions(+), 49 deletions(-)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
index 59ee63ae..12c81a1 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
@@ -38,10 +38,9 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
import org.apache.spark.sql.execution.datasources._
-import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types._
-import org.apache.spark.util.SerializableConfiguration
+import org.apache.spark.util.{SerializableConfiguration, Utils}
private[sql] object OrcFileFormat {
private def checkFieldName(name: String): Unit = {
@@ -180,11 +179,11 @@ class OrcFileFormat
val fs = filePath.getFileSystem(conf)
val readerOptions = OrcFile.readerOptions(conf).filesystem(fs)
- val reader = OrcFile.createReader(filePath, readerOptions)
-
- val requestedColIdsOrEmptyFile = OrcUtils.requestedColumnIds(
- isCaseSensitive, dataSchema, requiredSchema, reader, conf)
- reader.close()
+ val requestedColIdsOrEmptyFile =
+ Utils.tryWithResource(OrcFile.createReader(filePath, readerOptions)) {
reader =>
+ OrcUtils.requestedColumnIds(
+ isCaseSensitive, dataSchema, requiredSchema, reader, conf)
+ }
if (requestedColIdsOrEmptyFile.isEmpty) {
Iterator.empty
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala
index f3c92f3..eea9b2a 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala
@@ -35,7 +35,7 @@ import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
import org.apache.spark.sql.catalyst.util.quoteIdentifier
import org.apache.spark.sql.execution.datasources.SchemaMergeUtils
import org.apache.spark.sql.types._
-import org.apache.spark.util.{SerializableConfiguration, ThreadUtils}
+import org.apache.spark.util.{ThreadUtils, Utils}
object OrcUtils extends Logging {
@@ -62,9 +62,9 @@ object OrcUtils extends Logging {
val fs = file.getFileSystem(conf)
val readerOptions = OrcFile.readerOptions(conf).filesystem(fs)
try {
- val reader = OrcFile.createReader(file, readerOptions)
- val schema = reader.getSchema
- reader.close()
+ val schema = Utils.tryWithResource(OrcFile.createReader(file,
readerOptions)) { reader =>
+ reader.getSchema
+ }
if (schema.getFieldNames.size == 0) {
None
} else {
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcPartitionReaderFactory.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcPartitionReaderFactory.scala
index 7780ce4..03d58fd 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcPartitionReaderFactory.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcPartitionReaderFactory.scala
@@ -36,7 +36,7 @@ import org.apache.spark.sql.execution.datasources.v2._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{AtomicType, StructType}
import org.apache.spark.sql.vectorized.ColumnarBatch
-import org.apache.spark.util.SerializableConfiguration
+import org.apache.spark.util.{SerializableConfiguration, Utils}
/**
* A factory used to create Orc readers.
@@ -74,11 +74,11 @@ case class OrcPartitionReaderFactory(
val fs = filePath.getFileSystem(conf)
val readerOptions = OrcFile.readerOptions(conf).filesystem(fs)
- val reader = OrcFile.createReader(filePath, readerOptions)
-
- val requestedColIdsOrEmptyFile = OrcUtils.requestedColumnIds(
- isCaseSensitive, dataSchema, readDataSchema, reader, conf)
- reader.close()
+ val requestedColIdsOrEmptyFile =
+ Utils.tryWithResource(OrcFile.createReader(filePath, readerOptions)) {
reader =>
+ OrcUtils.requestedColumnIds(
+ isCaseSensitive, dataSchema, readDataSchema, reader, conf)
+ }
if (requestedColIdsOrEmptyFile.isEmpty) {
new EmptyPartitionReader[InternalRow]
@@ -120,11 +120,11 @@ case class OrcPartitionReaderFactory(
val fs = filePath.getFileSystem(conf)
val readerOptions = OrcFile.readerOptions(conf).filesystem(fs)
- val reader = OrcFile.createReader(filePath, readerOptions)
-
- val requestedColIdsOrEmptyFile = OrcUtils.requestedColumnIds(
- isCaseSensitive, dataSchema, readDataSchema, reader, conf)
- reader.close()
+ val requestedColIdsOrEmptyFile =
+ Utils.tryWithResource(OrcFile.createReader(filePath, readerOptions)) {
reader =>
+ OrcUtils.requestedColumnIds(
+ isCaseSensitive, dataSchema, readDataSchema, reader, conf)
+ }
if (requestedColIdsOrEmptyFile.isEmpty) {
new EmptyPartitionReader
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala
index c334d01..b8bf4b1 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala
@@ -194,9 +194,9 @@ abstract class OrcQueryTest extends OrcTest {
val orcFilePath = new Path(maybeOrcFile.get.getAbsolutePath)
val conf = OrcFile.readerOptions(new Configuration())
- val reader = OrcFile.createReader(orcFilePath, conf)
- assert("ZLIB" === reader.getCompressionKind.name)
- reader.close()
+ Utils.tryWithResource(OrcFile.createReader(orcFilePath, conf)) { reader
=>
+ assert("ZLIB" === reader.getCompressionKind.name)
+ }
}
// `compression` overrides `orc.compress`.
@@ -211,9 +211,9 @@ abstract class OrcQueryTest extends OrcTest {
val orcFilePath = new Path(maybeOrcFile.get.getAbsolutePath)
val conf = OrcFile.readerOptions(new Configuration())
- val reader = OrcFile.createReader(orcFilePath, conf)
- assert("ZLIB" === reader.getCompressionKind.name)
- reader.close()
+ Utils.tryWithResource(OrcFile.createReader(orcFilePath, conf)) { reader
=>
+ assert("ZLIB" === reader.getCompressionKind.name)
+ }
}
}
@@ -229,9 +229,9 @@ abstract class OrcQueryTest extends OrcTest {
val orcFilePath = new Path(maybeOrcFile.get.getAbsolutePath)
val conf = OrcFile.readerOptions(new Configuration())
- val reader = OrcFile.createReader(orcFilePath, conf)
- assert("ZLIB" === reader.getCompressionKind.name)
- reader.close()
+ Utils.tryWithResource(OrcFile.createReader(orcFilePath, conf)) { reader
=>
+ assert("ZLIB" === reader.getCompressionKind.name)
+ }
}
withTempPath { file =>
@@ -244,9 +244,9 @@ abstract class OrcQueryTest extends OrcTest {
val orcFilePath = new Path(maybeOrcFile.get.getAbsolutePath)
val conf = OrcFile.readerOptions(new Configuration())
- val reader = OrcFile.createReader(orcFilePath, conf)
- assert("SNAPPY" === reader.getCompressionKind.name)
- reader.close()
+ Utils.tryWithResource(OrcFile.createReader(orcFilePath, conf)) { reader
=>
+ assert("SNAPPY" === reader.getCompressionKind.name)
+ }
}
withTempPath { file =>
@@ -259,9 +259,9 @@ abstract class OrcQueryTest extends OrcTest {
val orcFilePath = new Path(maybeOrcFile.get.getAbsolutePath)
val conf = OrcFile.readerOptions(new Configuration())
- val reader = OrcFile.createReader(orcFilePath, conf)
- assert("NONE" === reader.getCompressionKind.name)
- reader.close()
+ Utils.tryWithResource(OrcFile.createReader(orcFilePath, conf)) { reader
=>
+ assert("NONE" === reader.getCompressionKind.name)
+ }
}
}
@@ -645,9 +645,9 @@ class OrcQuerySuite extends OrcQueryTest with
SharedSparkSession {
val orcFilePath = new Path(maybeOrcFile.get.getAbsolutePath)
val conf = OrcFile.readerOptions(new Configuration())
- val reader = OrcFile.createReader(orcFilePath, conf)
- assert("LZO" === reader.getCompressionKind.name)
- reader.close()
+ Utils.tryWithResource(OrcFile.createReader(orcFilePath, conf)) { reader
=>
+ assert("LZO" === reader.getCompressionKind.name)
+ }
}
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala
index 70d15bb..a2d96dd 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala
@@ -372,10 +372,10 @@ abstract class OrcSuite extends OrcTest with
BeforeAndAfterAll {
val orcFilePath = new Path(partFiles.head.getAbsolutePath)
val readerOptions = OrcFile.readerOptions(new Configuration())
- val reader = OrcFile.createReader(orcFilePath, readerOptions)
- val version =
UTF_8.decode(reader.getMetadataValue(SPARK_VERSION_METADATA_KEY)).toString
- reader.close()
- assert(version === SPARK_VERSION_SHORT)
+ Utils.tryWithResource(OrcFile.createReader(orcFilePath, readerOptions))
{ reader =>
+ val version =
UTF_8.decode(reader.getMetadataValue(SPARK_VERSION_METADATA_KEY)).toString
+ assert(version === SPARK_VERSION_SHORT)
+ }
}
}
diff --git
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
index 487e888..4253fe2 100644
---
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
+++
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
@@ -2366,11 +2366,11 @@ class HiveDDLSuite
checkAnswer(spark.table("t"), Row(1))
val maybeFile = path.listFiles().find(_.getName.startsWith("part"))
- val reader = getReader(maybeFile.head.getCanonicalPath)
- assert(reader.getCompressionKind.name === "ZLIB")
- assert(reader.getCompressionSize == 1001)
- assert(reader.getRowIndexStride == 2002)
- reader.close()
+ Utils.tryWithResource(getReader(maybeFile.head.getCanonicalPath))
{ reader =>
+ assert(reader.getCompressionKind.name === "ZLIB")
+ assert(reader.getCompressionSize == 1001)
+ assert(reader.getRowIndexStride == 2002)
+ }
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]