This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-2.4 by this push:
new 7ea3195 [SPARK-29101][SQL][2.4] Fix count API for csv file when
DROPMALFORMED mode is selected
7ea3195 is described below
commit 7ea319521f1e7d0d54fd1d85d743e9717d73841d
Author: sandeep katta <[email protected]>
AuthorDate: Thu Sep 19 15:24:07 2019 -0700
[SPARK-29101][SQL][2.4] Fix count API for csv file when DROPMALFORMED mode
is selected
### What changes were proposed in this pull request?
#DataSet
fruit,color,price,quantity
apple,red,1,3
banana,yellow,2,4
orange,orange,3,5
xxx
This PR aims to fix the below
```
scala> spark.conf.set("spark.sql.csv.parser.columnPruning.enabled", false)
scala> spark.read.option("header", "true").option("mode",
"DROPMALFORMED").csv("fruit.csv").count
res1: Long = 4
```
This is caused by the issue
[SPARK-24645](https://issues.apache.org/jira/browse/SPARK-24645).
SPARK-24645 issue can also be solved by
[SPARK-25387](https://issues.apache.org/jira/browse/SPARK-25387)
### Why are the changes needed?
SPARK-24645 caused this regression, so reverted the code as it can also be
solved by SPARK-25387
### Does this PR introduce any user-facing change?
No,
### How was this patch tested?
Added UT, and also tested the bug SPARK-24645
**SPARK-24645 regression**

Closes #25843 from sandeep-katta/SPARK-29101_branch2.4.
Authored-by: sandeep katta <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../sql/execution/datasources/csv/UnivocityParser.scala | 7 ++++---
sql/core/src/test/resources/test-data/malformedRow.csv | 5 +++++
.../spark/sql/execution/datasources/csv/CSVSuite.scala | 14 ++++++++++++++
3 files changed, 23 insertions(+), 3 deletions(-)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala
index 42e3964..69bd11f 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala
@@ -203,12 +203,13 @@ class UnivocityParser(
}
}
- private val doParse = if (requiredSchema.nonEmpty) {
- (input: String) => convert(tokenizer.parseLine(input))
- } else {
+ private val doParse = if (options.columnPruning && requiredSchema.isEmpty) {
// If `columnPruning` enabled and partition attributes scanned only,
// `schema` gets empty.
(_: String) => InternalRow.empty
+ } else {
+ // parse if the columnPruning is disabled or requiredSchema is nonEmpty
+ (input: String) => convert(tokenizer.parseLine(input))
}
/**
diff --git a/sql/core/src/test/resources/test-data/malformedRow.csv
b/sql/core/src/test/resources/test-data/malformedRow.csv
new file mode 100644
index 0000000..8cfb3eef
--- /dev/null
+++ b/sql/core/src/test/resources/test-data/malformedRow.csv
@@ -0,0 +1,5 @@
+fruit,color,price,quantity
+apple,red,1,3
+banana,yellow,2,4
+orange,orange,3,5
+malformedrow
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
index df9d154..d714cb2 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
@@ -62,6 +62,7 @@ class CSVSuite extends QueryTest with SharedSQLContext with
SQLTestUtils with Te
private val datesFile = "test-data/dates.csv"
private val unescapedQuotesFile = "test-data/unescaped-quotes.csv"
private val valueMalformedFile = "test-data/value-malformed.csv"
+ private val malformedRowFile = "test-data/malformedRow.csv"
/** Verifies data and schema. */
private def verifyCars(
@@ -1861,4 +1862,17 @@ class CSVSuite extends QueryTest with SharedSQLContext
with SQLTestUtils with Te
}
}
}
+
+ test("SPARK-29101 test count with DROPMALFORMED mode") {
+ Seq((true, 4), (false, 3)).foreach { case (csvColumnPruning,
expectedCount) =>
+ withSQLConf(SQLConf.CSV_PARSER_COLUMN_PRUNING.key ->
csvColumnPruning.toString) {
+ val count = spark.read
+ .option("header", "true")
+ .option("mode", "DROPMALFORMED")
+ .csv(testFile(malformedRowFile))
+ .count()
+ assert(expectedCount == count)
+ }
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]