Repository: spark
Updated Branches:
refs/heads/master 520d92a19 -> 6273a711b
[SPARK-21610][SQL] Corrupt records are not handled properly when creating a
dataframe from a file
## What changes were proposed in this pull request?
```
echo '{"field": 1}
{"field": 2}
{"field": "3"}' >/tmp/sample.json
```
```scala
import org.apache.spark.sql.types._
val schema = new StructType()
.add("field", ByteType)
.add("_corrupt_record", StringType)
val file = "/tmp/sample.json"
val dfFromFile = spark.read.schema(schema).json(file)
scala> dfFromFile.show(false)
+-----+---------------+
|field|_corrupt_record|
+-----+---------------+
|1 |null |
|2 |null |
|null |{"field": "3"} |
+-----+---------------+
scala> dfFromFile.filter($"_corrupt_record".isNotNull).count()
res1: Long = 0
scala> dfFromFile.filter($"_corrupt_record".isNull).count()
res2: Long = 3
```
When the `requiredSchema` only contains `_corrupt_record`, the derived
`actualSchema` is empty and the `_corrupt_record` are all null for all rows.
This PR captures above situation and raise an exception with a reasonable
workaround messag so that users can know what happened and how to fix the query.
## How was this patch tested?
Added test case.
Author: Jen-Ming Chung <[email protected]>
Closes #18865 from jmchung/SPARK-21610.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6273a711
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6273a711
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6273a711
Branch: refs/heads/master
Commit: 6273a711b69139ef0210f59759030a0b4a26b118
Parents: 520d92a
Author: Jen-Ming Chung <[email protected]>
Authored: Sun Sep 10 17:26:43 2017 -0700
Committer: gatorsmile <[email protected]>
Committed: Sun Sep 10 17:26:43 2017 -0700
----------------------------------------------------------------------
docs/sql-programming-guide.md | 4 +++
.../datasources/json/JsonFileFormat.scala | 14 ++++++++++
.../execution/datasources/json/JsonSuite.scala | 29 ++++++++++++++++++++
3 files changed, 47 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/6273a711/docs/sql-programming-guide.md
----------------------------------------------------------------------
diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md
index 45ba4d1..0a8acbb 100644
--- a/docs/sql-programming-guide.md
+++ b/docs/sql-programming-guide.md
@@ -1543,6 +1543,10 @@ options.
# Migration Guide
+## Upgrading From Spark SQL 2.2 to 2.3
+
+ - Since Spark 2.3, the queries from raw JSON/CSV files are disallowed when
the referenced columns only include the internal corrupt record column (named
`_corrupt_record` by default). For example,
`spark.read.schema(schema).json(file).filter($"_corrupt_record".isNotNull).count()`
and `spark.read.schema(schema).json(file).select("_corrupt_record").show()`.
Instead, you can cache or save the parsed results and then send the same query.
For example, `val df = spark.read.schema(schema).json(file).cache()` and then
`df.filter($"_corrupt_record".isNotNull).count()`.
+
## Upgrading From Spark SQL 2.1 to 2.2
- Spark 2.1.1 introduced a new configuration key:
`spark.sql.hive.caseSensitiveInferenceMode`. It had a default setting of
`NEVER_INFER`, which kept behavior identical to 2.1.0. However, Spark 2.2.0
changes this setting's default value to `INFER_AND_SAVE` to restore
compatibility with reading Hive metastore tables whose underlying file schema
have mixed-case column names. With the `INFER_AND_SAVE` configuration value, on
first access Spark will perform schema inference on any Hive metastore table
for which it has not already saved an inferred schema. Note that schema
inference can be a very time consuming operation for tables with thousands of
partitions. If compatibility with mixed-case column names is not a concern, you
can safely set `spark.sql.hive.caseSensitiveInferenceMode` to `NEVER_INFER` to
avoid the initial overhead of schema inference. Note that with the new default
`INFER_AND_SAVE` setting, the results of the schema inference are saved as a
metastore key for future use
. Therefore, the initial schema inference occurs only at a table's first
access.
http://git-wip-us.apache.org/repos/asf/spark/blob/6273a711/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
----------------------------------------------------------------------
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
index 53d62d8..b5ed6e4 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
@@ -113,6 +113,20 @@ class JsonFileFormat extends TextBasedFileFormat with
DataSourceRegister {
}
}
+ if (requiredSchema.length == 1 &&
+ requiredSchema.head.name == parsedOptions.columnNameOfCorruptRecord) {
+ throw new AnalysisException(
+ "Since Spark 2.3, the queries from raw JSON/CSV files are disallowed
when the\n" +
+ "referenced columns only include the internal corrupt record column\n"
+
+ s"(named ${parsedOptions.columnNameOfCorruptRecord} by default). For
example:\n" +
+
"spark.read.schema(schema).json(file).filter($\"_corrupt_record\".isNotNull).count()\n"
+
+ "and
spark.read.schema(schema).json(file).select(\"_corrupt_record\").show().\n" +
+ "Instead, you can cache or save the parsed results and then send the
same query.\n" +
+ "For example, val df = spark.read.schema(schema).json(file).cache()
and then\n" +
+ "df.filter($\"_corrupt_record\".isNotNull).count()."
+ )
+ }
+
(file: PartitionedFile) => {
val parser = new JacksonParser(actualSchema, parsedOptions)
JsonDataSource(parsedOptions).readFile(
http://git-wip-us.apache.org/repos/asf/spark/blob/6273a711/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
----------------------------------------------------------------------
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
index 0008954..8c8d41e 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
@@ -2034,4 +2034,33 @@ class JsonSuite extends QueryTest with SharedSQLContext
with TestJsonData {
}
}
}
+
+ test("SPARK-21610: Corrupt records are not handled properly when creating a
dataframe " +
+ "from a file") {
+ withTempPath { dir =>
+ val path = dir.getCanonicalPath
+ val data =
+ """{"field": 1}
+ |{"field": 2}
+ |{"field": "3"}""".stripMargin
+ Seq(data).toDF().repartition(1).write.text(path)
+ val schema = new StructType().add("field",
ByteType).add("_corrupt_record", StringType)
+ // negative cases
+ val msg = intercept[AnalysisException] {
+
spark.read.schema(schema).json(path).select("_corrupt_record").collect()
+ }.getMessage
+ assert(msg.contains("only include the internal corrupt record column"))
+ intercept[catalyst.errors.TreeNodeException[_]] {
+
spark.read.schema(schema).json(path).filter($"_corrupt_record".isNotNull).count()
+ }
+ // workaround
+ val df = spark.read.schema(schema).json(path).cache()
+ assert(df.filter($"_corrupt_record".isNotNull).count() == 1)
+ assert(df.filter($"_corrupt_record".isNull).count() == 2)
+ checkAnswer(
+ df.select("_corrupt_record"),
+ Row(null) :: Row(null) :: Row("{\"field\": \"3\"}") :: Nil
+ )
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]