Repository: spark Updated Branches: refs/heads/master 6563d72b1 -> 1f43562da
[SPARK-14343][SQL] Proper column pruning for text data source ## What changes were proposed in this pull request? Text data source ignores requested schema, and may give wrong result when the only data column is not requested. This may happen when only partitioning column(s) are requested for a partitioned text table. ## How was this patch tested? New test case added in `TextSuite`. Author: Cheng Lian <[email protected]> Closes #13431 from liancheng/spark-14343-partitioned-text-table. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1f43562d Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1f43562d Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1f43562d Branch: refs/heads/master Commit: 1f43562daf9454428796317203d9dcc9030a46eb Parents: 6563d72 Author: Cheng Lian <[email protected]> Authored: Wed Jun 1 07:30:55 2016 -0700 Committer: Cheng Lian <[email protected]> Committed: Wed Jun 1 07:30:55 2016 -0700 ---------------------------------------------------------------------- .../datasources/text/TextFileFormat.scala | 31 +++++++++++++------- .../execution/datasources/text/TextSuite.scala | 17 +++++++++-- 2 files changed, 35 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/1f43562d/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala index 1e5bce4..9c03ab2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala @@ -92,20 +92,31 @@ class TextFileFormat extends FileFormat with DataSourceRegister { filters: Seq[Filter], options: Map[String, String], hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = { + assert( + requiredSchema.length <= 1, + "Text data source only produces a single data column named \"value\".") + val broadcastedHadoopConf = sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf)) (file: PartitionedFile) => { - val unsafeRow = new UnsafeRow(1) - val bufferHolder = new BufferHolder(unsafeRow) - val unsafeRowWriter = new UnsafeRowWriter(bufferHolder, 1) - - new HadoopFileLinesReader(file, broadcastedHadoopConf.value.value).map { line => - // Writes to an UnsafeRow directly - bufferHolder.reset() - unsafeRowWriter.write(0, line.getBytes, 0, line.getLength) - unsafeRow.setTotalSize(bufferHolder.totalSize()) - unsafeRow + val reader = new HadoopFileLinesReader(file, broadcastedHadoopConf.value.value) + + if (requiredSchema.isEmpty) { + val emptyUnsafeRow = new UnsafeRow(0) + reader.map(_ => emptyUnsafeRow) + } else { + val unsafeRow = new UnsafeRow(1) + val bufferHolder = new BufferHolder(unsafeRow) + val unsafeRowWriter = new UnsafeRowWriter(bufferHolder, 1) + + reader.map { line => + // Writes to an UnsafeRow directly + bufferHolder.reset() + unsafeRowWriter.write(0, line.getBytes, 0, line.getLength) + unsafeRow.setTotalSize(bufferHolder.totalSize()) + unsafeRow + } } } } http://git-wip-us.apache.org/repos/asf/spark/blob/1f43562d/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala index b5e51e9..7b6981f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala @@ -19,9 +19,6 @@ package org.apache.spark.sql.execution.datasources.text import java.io.File -import scala.collection.JavaConverters._ - -import org.apache.hadoop.conf.Configuration import org.apache.hadoop.io.SequenceFile.CompressionType import org.apache.hadoop.io.compress.GzipCodec @@ -31,6 +28,7 @@ import org.apache.spark.sql.types.{StringType, StructType} import org.apache.spark.util.Utils class TextSuite extends QueryTest with SharedSQLContext { + import testImplicits._ test("reading text file") { verifyFrame(spark.read.format("text").load(testFile)) @@ -126,6 +124,19 @@ class TextSuite extends QueryTest with SharedSQLContext { } } + test("SPARK-14343: select partitioning column") { + withTempPath { dir => + val path = dir.getCanonicalPath + val ds1 = spark.range(1).selectExpr("CONCAT('val_', id)") + ds1.write.text(s"$path/part=a") + ds1.write.text(s"$path/part=b") + + checkDataset( + spark.read.format("text").load(path).select($"part"), + Row("a"), Row("b")) + } + } + private def testFile: String = { Thread.currentThread().getContextClassLoader.getResource("text-suite.txt").toString } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
