diff --git a/R/pkg/tests/fulltests/test_sparkSQL.R 
b/R/pkg/tests/fulltests/test_sparkSQL.R
index a1805f57b1dcf..88f2286219525 100644
--- a/R/pkg/tests/fulltests/test_sparkSQL.R
+++ b/R/pkg/tests/fulltests/test_sparkSQL.R
@@ -1694,7 +1694,7 @@ test_that("column functions", {
 
   # check for unparseable
   df <- as.DataFrame(list(list("a" = "")))
-  expect_equal(collect(select(df, from_json(df$a, schema)))[[1]][[1]]$a, NA)
+  expect_equal(collect(select(df, from_json(df$a, schema)))[[1]][[1]], NA)
 
   # check if array type in string is correctly supported.
   jsonArr <- "[{\"name\":\"Bob\"}, {\"name\":\"Alice\"}]"
diff --git a/docs/sql-migration-guide-upgrade.md 
b/docs/sql-migration-guide-upgrade.md
index fce0b9a5f86a0..5d3d4c6ece39d 100644
--- a/docs/sql-migration-guide-upgrade.md
+++ b/docs/sql-migration-guide-upgrade.md
@@ -17,8 +17,6 @@ displayTitle: Spark SQL Upgrading Guide
 
   - Since Spark 3.0, the `from_json` functions supports two modes - 
`PERMISSIVE` and `FAILFAST`. The modes can be set via the `mode` option. The 
default mode became `PERMISSIVE`. In previous versions, behavior of `from_json` 
did not conform to either `PERMISSIVE` nor `FAILFAST`, especially in processing 
of malformed JSON records. For example, the JSON string `{"a" 1}` with the 
schema `a INT` is converted to `null` by previous versions but Spark 3.0 
converts it to `Row(null)`.
 
-  - In Spark version 2.4 and earlier, the `from_json` function produces 
`null`s for JSON strings and JSON datasource skips the same independently of 
its mode if there is no valid root JSON token in its input (` ` for example). 
Since Spark 3.0, such input is treated as a bad record and handled according to 
specified mode. For example, in the `PERMISSIVE` mode the ` ` input is 
converted to `Row(null, null)` if specified schema is `key STRING, value INT`.
-
   - The `ADD JAR` command previously returned a result set with the single 
value 0. It now returns an empty result set.
 
   - In Spark version 2.4 and earlier, users can create map values with map 
type key via built-in function like `CreateMap`, `MapFromArrays`, etc. Since 
Spark 3.0, it's not allowed to create map values with map type key with these 
built-in functions. Users can still read map values with map type key from data 
source or Java/Scala collections, though they are not very useful.
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
index e0cab537ce1c6..3403349c8974e 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
@@ -548,23 +548,15 @@ case class JsonToStructs(
       s"Input schema ${nullableSchema.catalogString} must be a struct, an 
array or a map.")
   }
 
-  @transient
-  private lazy val castRow = nullableSchema match {
-    case _: StructType => (row: InternalRow) => row
-    case _: ArrayType => (row: InternalRow) => row.getArray(0)
-    case _: MapType => (row: InternalRow) => row.getMap(0)
-  }
-
   // This converts parsed rows to the desired output by the given schema.
-  private def convertRow(rows: Iterator[InternalRow]) = {
-    if (rows.hasNext) {
-      val result = rows.next()
-      // JSON's parser produces one record only.
-      assert(!rows.hasNext)
-      castRow(result)
-    } else {
-      throw new IllegalArgumentException("Expected one row from JSON parser.")
-    }
+  @transient
+  lazy val converter = nullableSchema match {
+    case _: StructType =>
+      (rows: Iterator[InternalRow]) => if (rows.hasNext) rows.next() else null
+    case _: ArrayType =>
+      (rows: Iterator[InternalRow]) => if (rows.hasNext) 
rows.next().getArray(0) else null
+    case _: MapType =>
+      (rows: Iterator[InternalRow]) => if (rows.hasNext) rows.next().getMap(0) 
else null
   }
 
   val nameOfCorruptRecord = 
SQLConf.get.getConf(SQLConf.COLUMN_NAME_OF_CORRUPT_RECORD)
@@ -600,7 +592,7 @@ case class JsonToStructs(
     copy(timeZoneId = Option(timeZoneId))
 
   override def nullSafeEval(json: Any): Any = {
-    convertRow(parser.parse(json.asInstanceOf[UTF8String]))
+    converter(parser.parse(json.asInstanceOf[UTF8String]))
   }
 
   override def inputTypes: Seq[AbstractDataType] = StringType :: Nil
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
index 3f245e1400fa1..8cf758e26e29b 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
@@ -399,7 +399,7 @@ class JacksonParser(
         // a null first token is equivalent to testing for input.trim.isEmpty
         // but it works on any token stream and not just strings
         parser.nextToken() match {
-          case null => throw new RuntimeException("Not found any JSON token")
+          case null => Nil
           case _ => rootConverter.apply(parser) match {
             case null => throw new RuntimeException("Root converter returned 
null")
             case rows => rows
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala
index 238e6e34b4ae5..b190d6f5caa1c 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala
@@ -548,7 +548,7 @@ class JsonExpressionsSuite extends SparkFunSuite with 
ExpressionEvalHelper with
     val schema = StructType(StructField("a", IntegerType) :: Nil)
     checkEvaluation(
       JsonToStructs(schema, Map.empty, Literal.create(" ", StringType), gmtId),
-      InternalRow(null)
+      null
     )
   }
 
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index 656da9fa01806..806f0b2239fe6 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -240,6 +240,16 @@ class SQLQuerySuite extends QueryTest with 
SharedSQLContext {
       Seq(Row("1"), Row("2")))
   }
 
+  test("SPARK-11226 Skip empty line in json file") {
+    spark.read
+      .json(Seq("{\"a\": \"1\"}}", "{\"a\": \"2\"}}", "{\"a\": \"3\"}}", 
"").toDS())
+      .createOrReplaceTempView("d")
+
+    checkAnswer(
+      sql("select count(1) from d"),
+      Seq(Row(3)))
+  }
+
   test("SPARK-8828 sum should return null if all input values are null") {
     checkAnswer(
       sql("select sum(a), avg(a) from allNulls"),
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
index 78debb5731116..49dd9c22e831b 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
@@ -1126,7 +1126,6 @@ class JsonSuite extends QueryTest with SharedSQLContext 
with TestJsonData {
         Row(null, null, null),
         Row(null, null, null),
         Row(null, null, null),
-        Row(null, null, null),
         Row("str_a_4", "str_b_4", "str_c_4"),
         Row(null, null, null))
     )
@@ -1148,7 +1147,6 @@ class JsonSuite extends QueryTest with SharedSQLContext 
with TestJsonData {
       checkAnswer(
         jsonDF.select($"a", $"b", $"c", $"_unparsed"),
         Row(null, null, null, "{") ::
-          Row(null, null, null, "") ::
           Row(null, null, null, """{"a":1, b:2}""") ::
           Row(null, null, null, """{"a":{, b:3}""") ::
           Row("str_a_4", "str_b_4", "str_c_4", null) ::
@@ -1163,7 +1161,6 @@ class JsonSuite extends QueryTest with SharedSQLContext 
with TestJsonData {
       checkAnswer(
         jsonDF.filter($"_unparsed".isNotNull).select($"_unparsed"),
         Row("{") ::
-          Row("") ::
           Row("""{"a":1, b:2}""") ::
           Row("""{"a":{, b:3}""") ::
           Row("]") :: Nil
@@ -1185,7 +1182,6 @@ class JsonSuite extends QueryTest with SharedSQLContext 
with TestJsonData {
     checkAnswer(
       jsonDF.selectExpr("a", "b", "c", "_malformed"),
       Row(null, null, null, "{") ::
-        Row(null, null, null, "") ::
         Row(null, null, null, """{"a":1, b:2}""") ::
         Row(null, null, null, """{"a":{, b:3}""") ::
         Row("str_a_4", "str_b_4", "str_c_4", null) ::
@@ -1727,7 +1723,6 @@ class JsonSuite extends QueryTest with SharedSQLContext 
with TestJsonData {
       val path = dir.getCanonicalPath
       primitiveFieldAndType
         .toDF("value")
-        .repartition(1)
         .write
         .option("compression", "GzIp")
         .text(path)
@@ -2428,7 +2423,7 @@ class JsonSuite extends QueryTest with SharedSQLContext 
with TestJsonData {
     }
 
     checkCount(2)
-    countForMalformedJSON(1, Seq(""))
+    countForMalformedJSON(0, Seq(""))
   }
 
   test("SPARK-25040: empty strings should be disallowed") {


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to