This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new facf9c3 [SPARK-28204][SQL][TESTS] Make separate two test cases for
column pruning in binary files
facf9c3 is described below
commit facf9c30a283ec682b5adb2e7afdbf5d011e3808
Author: HyukjinKwon <[email protected]>
AuthorDate: Sat Jun 29 14:05:23 2019 +0900
[SPARK-28204][SQL][TESTS] Make separate two test cases for column pruning
in binary files
## What changes were proposed in this pull request?
SPARK-27534 missed to address my own comments at
https://github.com/WeichenXu123/spark/pull/8
It's better to push this in since the codes are already cleaned up.
## How was this patch tested?
Unittests fixed
Closes #25003 from HyukjinKwon/SPARK-27534.
Authored-by: HyukjinKwon <[email protected]>
Signed-off-by: HyukjinKwon <[email protected]>
---
.../binaryfile/BinaryFileFormatSuite.scala | 88 +++++++++++-----------
1 file changed, 43 insertions(+), 45 deletions(-)
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormatSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormatSuite.scala
index 9e2969b..a66b34f 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormatSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormatSuite.scala
@@ -290,56 +290,54 @@ class BinaryFileFormatSuite extends QueryTest with
SharedSQLContext with SQLTest
), true)
}
+ private def readBinaryFile(file: File, requiredSchema: StructType): Row = {
+ val format = new BinaryFileFormat
+ val reader = format.buildReaderWithPartitionValues(
+ sparkSession = spark,
+ dataSchema = schema,
+ partitionSchema = StructType(Nil),
+ requiredSchema = requiredSchema,
+ filters = Seq.empty,
+ options = Map.empty,
+ hadoopConf = spark.sessionState.newHadoopConf()
+ )
+ val partitionedFile = mock(classOf[PartitionedFile])
+ when(partitionedFile.filePath).thenReturn(file.getPath)
+ val encoder = RowEncoder(requiredSchema).resolveAndBind()
+ encoder.fromRow(reader(partitionedFile).next())
+ }
+
test("column pruning") {
- def getRequiredSchema(fieldNames: String*): StructType = {
- StructType(fieldNames.map {
- case f if schema.fieldNames.contains(f) => schema(f)
- case other => StructField(other, NullType)
- })
- }
- def read(file: File, requiredSchema: StructType): Row = {
- val format = new BinaryFileFormat
- val reader = format.buildReaderWithPartitionValues(
- sparkSession = spark,
- dataSchema = schema,
- partitionSchema = StructType(Nil),
- requiredSchema = requiredSchema,
- filters = Seq.empty,
- options = Map.empty,
- hadoopConf = spark.sessionState.newHadoopConf()
- )
- val partitionedFile = mock(classOf[PartitionedFile])
- when(partitionedFile.filePath).thenReturn(file.getPath)
- val encoder = RowEncoder(requiredSchema).resolveAndBind()
- encoder.fromRow(reader(partitionedFile).next())
- }
- val file = new File(Utils.createTempDir(), "data")
- val content = "123".getBytes
- Files.write(file.toPath, content, StandardOpenOption.CREATE,
StandardOpenOption.WRITE)
-
- read(file, getRequiredSchema(MODIFICATION_TIME, CONTENT, LENGTH, PATH))
match {
- case Row(t, c, len, p) =>
- assert(t === new Timestamp(file.lastModified()))
- assert(c === content)
- assert(len === content.length)
- assert(p.asInstanceOf[String].endsWith(file.getAbsolutePath))
+ withTempPath { file =>
+ val content = "123".getBytes
+ Files.write(file.toPath, content, StandardOpenOption.CREATE,
StandardOpenOption.WRITE)
+
+ val actual = readBinaryFile(file, StructType(schema.takeRight(3)))
+ val expected = Row(new Timestamp(file.lastModified()), content.length,
content)
+
+ assert(actual === expected)
}
- file.setReadable(false)
- withClue("cannot read content") {
+ }
+
+ test("column pruning - non-readable file") {
+ withTempPath { file =>
+ val content = "abc".getBytes
+ Files.write(file.toPath, content, StandardOpenOption.CREATE,
StandardOpenOption.WRITE)
+ file.setReadable(false)
+
+ // If content is selected, it throws an exception because it's not
readable.
intercept[IOException] {
- read(file, getRequiredSchema(CONTENT))
+ readBinaryFile(file, StructType(schema(CONTENT) :: Nil))
}
- }
- assert(read(file, getRequiredSchema(LENGTH)) === Row(content.length),
- "Get length should not read content.")
- intercept[RuntimeException] {
- read(file, getRequiredSchema(LENGTH, "other"))
- }
- val df = spark.read.format(BINARY_FILE).load(file.getPath)
- assert(df.count() === 1, "Count should not read content.")
- assert(df.select("LENGTH").first().getLong(0) === content.length,
- "column pruning should be case insensitive")
+ // Otherwise, it should be able to read.
+ assert(
+ readBinaryFile(file, StructType(schema(LENGTH) :: Nil)) ===
Row(content.length),
+ "Get length should not read content.")
+ assert(
+ spark.read.format(BINARY_FILE).load(file.getPath).count() === 1,
+ "Count should not read content.")
+ }
}
test("fail fast and do not attempt to read if a file is too big") {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]