Repository: spark
Updated Branches:
refs/heads/master 4df51361a -> bd14da6fd
[SPARK-23094][SPARK-23723][SPARK-23724][SQL] Support custom encoding for json
files
## What changes were proposed in this pull request?
I propose new option for JSON datasource which allows to specify encoding
(charset) of input and output files. Here is an example of using of the option:
```
spark.read.schema(schema)
.option("multiline", "true")
.option("encoding", "UTF-16LE")
.json(fileName)
```
If the option is not specified, charset auto-detection mechanism is used by
default.
The option can be used for saving datasets to jsons. Currently Spark is able to
save datasets into json files in `UTF-8` charset only. The changes allow to
save data in any supported charset. Here is the approximate list of supported
charsets by Oracle Java SE:
https://docs.oracle.com/javase/8/docs/technotes/guides/intl/encoding.doc.html .
An user can specify the charset of output jsons via the charset option like
`.option("charset", "UTF-16BE")`. By default the output charset is still
`UTF-8` to keep backward compatibility.
The solution has the following restrictions for per-line mode (`multiline =
false`):
- If charset is different from UTF-8, the lineSep option must be specified. The
option required because Hadoop LineReader cannot detect the line separator
correctly. Here is the ticket for solving the issue:
https://issues.apache.org/jira/browse/SPARK-23725
- Encoding with [BOM](https://en.wikipedia.org/wiki/Byte_order_mark) are not
supported. For example, the `UTF-16` and `UTF-32` encodings are blacklisted.
The problem can be solved by https://github.com/MaxGekk/spark-1/pull/2
## How was this patch tested?
I added the following tests:
- reads an json file in `UTF-16LE` encoding with BOM in `multiline` mode
- read json file by using charset auto detection (`UTF-32BE` with BOM)
- read json file using of user's charset (`UTF-16LE`)
- saving in `UTF-32BE` and read the result by standard library (not by Spark)
- checking that default charset is `UTF-8`
- handling wrong (unsupported) charset
Author: Maxim Gekk <[email protected]>
Author: Maxim Gekk <[email protected]>
Closes #20937 from MaxGekk/json-encoding-line-sep.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bd14da6f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bd14da6f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bd14da6f
Branch: refs/heads/master
Commit: bd14da6fd5a77cc03efff193a84ffccbe892cc13
Parents: 4df5136
Author: Maxim Gekk <[email protected]>
Authored: Sun Apr 29 11:25:31 2018 +0800
Committer: hyukjinkwon <[email protected]>
Committed: Sun Apr 29 11:25:31 2018 +0800
----------------------------------------------------------------------
python/pyspark/sql/readwriter.py | 15 +-
python/pyspark/sql/tests.py | 7 +
.../test_support/sql/people_array_utf16le.json | Bin 0 -> 182 bytes
.../sql/catalyst/json/CreateJacksonParser.scala | 49 +++-
.../spark/sql/catalyst/json/JSONOptions.scala | 39 ++-
.../spark/sql/catalyst/json/JacksonParser.scala | 10 +-
.../org/apache/spark/sql/DataFrameReader.scala | 3 +
.../org/apache/spark/sql/DataFrameWriter.scala | 8 +-
.../datasources/json/JsonDataSource.scala | 60 +++--
.../datasources/json/JsonFileFormat.scala | 10 +-
.../datasources/text/TextOptions.scala | 18 +-
.../src/test/resources/test-data/utf16LE.json | Bin 0 -> 98 bytes
.../test/resources/test-data/utf16WithBOM.json | Bin 0 -> 200 bytes
.../resources/test-data/utf32BEWithBOM.json | Bin 0 -> 172 bytes
.../datasources/json/JsonBenchmarks.scala | 179 ++++++++++++++
.../execution/datasources/json/JsonSuite.scala | 245 ++++++++++++++++++-
16 files changed, 599 insertions(+), 44 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/bd14da6f/python/pyspark/sql/readwriter.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py
index df176c5..6811fa6 100644
--- a/python/pyspark/sql/readwriter.py
+++ b/python/pyspark/sql/readwriter.py
@@ -176,7 +176,8 @@ class DataFrameReader(OptionUtils):
allowComments=None, allowUnquotedFieldNames=None,
allowSingleQuotes=None,
allowNumericLeadingZero=None,
allowBackslashEscapingAnyCharacter=None,
mode=None, columnNameOfCorruptRecord=None, dateFormat=None,
timestampFormat=None,
- multiLine=None, allowUnquotedControlChars=None, lineSep=None,
samplingRatio=None):
+ multiLine=None, allowUnquotedControlChars=None, lineSep=None,
samplingRatio=None,
+ encoding=None):
"""
Loads JSON files and returns the results as a :class:`DataFrame`.
@@ -237,6 +238,10 @@ class DataFrameReader(OptionUtils):
:param allowUnquotedControlChars: allows JSON Strings to contain
unquoted control
characters (ASCII characters with
value less than 32,
including tab and line feed
characters) or not.
+ :param encoding: allows to forcibly set one of standard basic or
extended encoding for
+ the JSON files. For example UTF-16BE, UTF-32LE. If
None is set,
+ the encoding of input JSON will be detected
automatically
+ when the multiLine option is set to ``true``.
:param lineSep: defines the line separator that should be used for
parsing. If None is
set, it covers all ``\\r``, ``\\r\\n`` and ``\\n``.
:param samplingRatio: defines fraction of input JSON objects used for
schema inferring.
@@ -259,7 +264,7 @@ class DataFrameReader(OptionUtils):
mode=mode, columnNameOfCorruptRecord=columnNameOfCorruptRecord,
dateFormat=dateFormat,
timestampFormat=timestampFormat, multiLine=multiLine,
allowUnquotedControlChars=allowUnquotedControlChars,
lineSep=lineSep,
- samplingRatio=samplingRatio)
+ samplingRatio=samplingRatio, encoding=encoding)
if isinstance(path, basestring):
path = [path]
if type(path) == list:
@@ -752,7 +757,7 @@ class DataFrameWriter(OptionUtils):
@since(1.4)
def json(self, path, mode=None, compression=None, dateFormat=None,
timestampFormat=None,
- lineSep=None):
+ lineSep=None, encoding=None):
"""Saves the content of the :class:`DataFrame` in JSON format
(`JSON Lines text format or newline-delimited JSON
<http://jsonlines.org/>`_) at the
specified path.
@@ -776,6 +781,8 @@ class DataFrameWriter(OptionUtils):
formats follow the formats at
``java.text.SimpleDateFormat``.
This applies to timestamp type. If None is
set, it uses the
default value,
``yyyy-MM-dd'T'HH:mm:ss.SSSXXX``.
+ :param encoding: specifies encoding (charset) of saved json files. If
None is set,
+ the default UTF-8 charset will be used.
:param lineSep: defines the line separator that should be used for
writing. If None is
set, it uses the default value, ``\\n``.
@@ -784,7 +791,7 @@ class DataFrameWriter(OptionUtils):
self.mode(mode)
self._set_opts(
compression=compression, dateFormat=dateFormat,
timestampFormat=timestampFormat,
- lineSep=lineSep)
+ lineSep=lineSep, encoding=encoding)
self._jwrite.json(path)
@since(1.4)
http://git-wip-us.apache.org/repos/asf/spark/blob/bd14da6f/python/pyspark/sql/tests.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py
index 6b28c55..e0cd2aa 100644
--- a/python/pyspark/sql/tests.py
+++ b/python/pyspark/sql/tests.py
@@ -685,6 +685,13 @@ class SQLTests(ReusedSQLTestCase):
multiLine=True)
self.assertEqual(people1.collect(), people_array.collect())
+ def test_encoding_json(self):
+ people_array = self.spark.read\
+ .json("python/test_support/sql/people_array_utf16le.json",
+ multiLine=True, encoding="UTF-16LE")
+ expected = [Row(age=30, name=u'Andy'), Row(age=19, name=u'Justin')]
+ self.assertEqual(people_array.collect(), expected)
+
def test_linesep_json(self):
df = self.spark.read.json("python/test_support/sql/people.json",
lineSep=",")
expected = [Row(_corrupt_record=None, name=u'Michael'),
http://git-wip-us.apache.org/repos/asf/spark/blob/bd14da6f/python/test_support/sql/people_array_utf16le.json
----------------------------------------------------------------------
diff --git a/python/test_support/sql/people_array_utf16le.json
b/python/test_support/sql/people_array_utf16le.json
new file mode 100644
index 0000000..9c657fa
Binary files /dev/null and b/python/test_support/sql/people_array_utf16le.json
differ
http://git-wip-us.apache.org/repos/asf/spark/blob/bd14da6f/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/CreateJacksonParser.scala
----------------------------------------------------------------------
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/CreateJacksonParser.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/CreateJacksonParser.scala
index 025a388..3e8e6db 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/CreateJacksonParser.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/CreateJacksonParser.scala
@@ -18,10 +18,14 @@
package org.apache.spark.sql.catalyst.json
import java.io.{ByteArrayInputStream, InputStream, InputStreamReader}
+import java.nio.channels.Channels
+import java.nio.charset.Charset
import com.fasterxml.jackson.core.{JsonFactory, JsonParser}
import org.apache.hadoop.io.Text
+import sun.nio.cs.StreamDecoder
+import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.unsafe.types.UTF8String
private[sql] object CreateJacksonParser extends Serializable {
@@ -43,7 +47,48 @@ private[sql] object CreateJacksonParser extends Serializable
{
jsonFactory.createParser(record.getBytes, 0, record.getLength)
}
- def inputStream(jsonFactory: JsonFactory, record: InputStream): JsonParser =
{
- jsonFactory.createParser(record)
+ // Jackson parsers can be ranked according to their performance:
+ // 1. Array based with actual encoding UTF-8 in the array. This is the
fastest parser
+ // but it doesn't allow to set encoding explicitly. Actual encoding is
detected automatically
+ // by checking leading bytes of the array.
+ // 2. InputStream based with actual encoding UTF-8 in the stream. Encoding
is detected
+ // automatically by analyzing first bytes of the input stream.
+ // 3. Reader based parser. This is the slowest parser used here but it
allows to create
+ // a reader with specific encoding.
+ // The method creates a reader for an array with given encoding and sets
size of internal
+ // decoding buffer according to size of input array.
+ private def getStreamDecoder(enc: String, in: Array[Byte], length: Int):
StreamDecoder = {
+ val bais = new ByteArrayInputStream(in, 0, length)
+ val byteChannel = Channels.newChannel(bais)
+ val decodingBufferSize = Math.min(length, 8192)
+ val decoder = Charset.forName(enc).newDecoder()
+
+ StreamDecoder.forDecoder(byteChannel, decoder, decodingBufferSize)
+ }
+
+ def text(enc: String, jsonFactory: JsonFactory, record: Text): JsonParser = {
+ val sd = getStreamDecoder(enc, record.getBytes, record.getLength)
+ jsonFactory.createParser(sd)
+ }
+
+ def inputStream(jsonFactory: JsonFactory, is: InputStream): JsonParser = {
+ jsonFactory.createParser(is)
+ }
+
+ def inputStream(enc: String, jsonFactory: JsonFactory, is: InputStream):
JsonParser = {
+ jsonFactory.createParser(new InputStreamReader(is, enc))
+ }
+
+ def internalRow(jsonFactory: JsonFactory, row: InternalRow): JsonParser = {
+ val ba = row.getBinary(0)
+
+ jsonFactory.createParser(ba, 0, ba.length)
+ }
+
+ def internalRow(enc: String, jsonFactory: JsonFactory, row: InternalRow):
JsonParser = {
+ val binary = row.getBinary(0)
+ val sd = getStreamDecoder(enc, binary, binary.length)
+
+ jsonFactory.createParser(sd)
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/bd14da6f/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala
----------------------------------------------------------------------
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala
index 5c9adc3..5f130af 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala
@@ -17,7 +17,7 @@
package org.apache.spark.sql.catalyst.json
-import java.nio.charset.StandardCharsets
+import java.nio.charset.{Charset, StandardCharsets}
import java.util.{Locale, TimeZone}
import com.fasterxml.jackson.core.{JsonFactory, JsonParser}
@@ -86,14 +86,43 @@ private[sql] class JSONOptions(
val multiLine = parameters.get("multiLine").map(_.toBoolean).getOrElse(false)
+ /**
+ * A string between two consecutive JSON records.
+ */
val lineSeparator: Option[String] = parameters.get("lineSep").map { sep =>
require(sep.nonEmpty, "'lineSep' cannot be an empty string.")
sep
}
- // Note that the option 'lineSep' uses a different default value in read and
write.
- val lineSeparatorInRead: Option[Array[Byte]] =
- lineSeparator.map(_.getBytes(StandardCharsets.UTF_8))
- // Note that JSON uses writer with UTF-8 charset. This string will be
written out as UTF-8.
+
+ /**
+ * Standard encoding (charset) name. For example UTF-8, UTF-16LE and
UTF-32BE.
+ * If the encoding is not specified (None), it will be detected automatically
+ * when the multiLine option is set to `true`.
+ */
+ val encoding: Option[String] = parameters.get("encoding")
+ .orElse(parameters.get("charset")).map { enc =>
+ // The following encodings are not supported in per-line mode (multiline
is false)
+ // because they cause some problems in reading files with BOM which is
supposed to
+ // present in the files with such encodings. After splitting input files
by lines,
+ // only the first lines will have the BOM which leads to impossibility
for reading
+ // the rest lines. Besides of that, the lineSep option must have the BOM
in such
+ // encodings which can never present between lines.
+ val blacklist = Seq(Charset.forName("UTF-16"), Charset.forName("UTF-32"))
+ val isBlacklisted = blacklist.contains(Charset.forName(enc))
+ require(multiLine || !isBlacklisted,
+ s"""The ${enc} encoding must not be included in the blacklist when
multiLine is disabled:
+ | ${blacklist.mkString(", ")}""".stripMargin)
+
+ val isLineSepRequired = !(multiLine == false &&
+ Charset.forName(enc) != StandardCharsets.UTF_8 &&
lineSeparator.isEmpty)
+ require(isLineSepRequired, s"The lineSep option must be specified for
the $enc encoding")
+
+ enc
+ }
+
+ val lineSeparatorInRead: Option[Array[Byte]] = lineSeparator.map { lineSep =>
+ lineSep.getBytes(encoding.getOrElse("UTF-8"))
+ }
val lineSeparatorInWrite: String = lineSeparator.getOrElse("\n")
/** Sets config options on a Jackson [[JsonFactory]]. */
http://git-wip-us.apache.org/repos/asf/spark/blob/bd14da6f/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
----------------------------------------------------------------------
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
index 7f69569..a5a4a13 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
@@ -17,7 +17,7 @@
package org.apache.spark.sql.catalyst.json
-import java.io.ByteArrayOutputStream
+import java.io.{ByteArrayOutputStream, CharConversionException}
import scala.collection.mutable.ArrayBuffer
import scala.util.Try
@@ -361,6 +361,14 @@ class JacksonParser(
// For such records, all fields other than the field configured by
// `columnNameOfCorruptRecord` are set to `null`.
throw BadRecordException(() => recordLiteral(record), () => None, e)
+ case e: CharConversionException if options.encoding.isEmpty =>
+ val msg =
+ """JSON parser cannot handle a character in its input.
+ |Specifying encoding as an input option explicitly might help to
resolve the issue.
+ |""".stripMargin + e.getMessage
+ val wrappedCharException = new CharConversionException(msg)
+ wrappedCharException.initCause(e)
+ throw BadRecordException(() => recordLiteral(record), () => None,
wrappedCharException)
}
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/bd14da6f/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index b44552f..6b2ea6c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -372,6 +372,9 @@ class DataFrameReader private[sql](sparkSession:
SparkSession) extends Logging {
* `java.text.SimpleDateFormat`. This applies to timestamp type.</li>
* <li>`multiLine` (default `false`): parse one record, which may span
multiple lines,
* per file</li>
+ * <li>`encoding` (by default it is not set): allows to forcibly set one of
standard basic
+ * or extended encoding for the JSON files. For example UTF-16BE, UTF-32LE.
If the encoding
+ * is not specified and `multiLine` is set to `true`, it will be detected
automatically.</li>
* <li>`lineSep` (default covers all `\r`, `\r\n` and `\n`): defines the
line separator
* that should be used for parsing.</li>
* <li>`samplingRatio` (default is 1.0): defines fraction of input JSON
objects used
http://git-wip-us.apache.org/repos/asf/spark/blob/bd14da6f/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index bbc0631..e183fa6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -518,8 +518,9 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T])
{
* <li>`timestampFormat` (default `yyyy-MM-dd'T'HH:mm:ss.SSSXXX`): sets the
string that
* indicates a timestamp format. Custom date formats follow the formats at
* `java.text.SimpleDateFormat`. This applies to timestamp type.</li>
- * <li>`lineSep` (default `\n`): defines the line separator that should
- * be used for writing.</li>
+ * <li>`encoding` (by default it is not set): specifies encoding (charset)
of saved json
+ * files. If it is not set, the UTF-8 charset will be used. </li>
+ * <li>`lineSep` (default `\n`): defines the line separator that should be
used for writing.</li>
* </ul>
*
* @since 1.4.0
@@ -589,8 +590,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T])
{
* <li>`compression` (default `null`): compression codec to use when saving
to file. This can be
* one of the known case-insensitive shorten names (`none`, `bzip2`, `gzip`,
`lz4`,
* `snappy` and `deflate`). </li>
- * <li>`lineSep` (default `\n`): defines the line separator that should
- * be used for writing.</li>
+ * <li>`lineSep` (default `\n`): defines the line separator that should be
used for writing.</li>
* </ul>
*
* @since 1.6.0
http://git-wip-us.apache.org/repos/asf/spark/blob/bd14da6f/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala
----------------------------------------------------------------------
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala
index 5769c09..983a5f0 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala
@@ -31,11 +31,11 @@ import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
import org.apache.spark.TaskContext
import org.apache.spark.input.{PortableDataStream, StreamInputFormat}
import org.apache.spark.rdd.{BinaryFileRDD, RDD}
-import org.apache.spark.sql.{AnalysisException, Dataset, Encoders,
SparkSession}
+import org.apache.spark.sql.{Dataset, Encoders, SparkSession}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.json.{CreateJacksonParser, JacksonParser,
JSONOptions}
import org.apache.spark.sql.execution.datasources._
-import org.apache.spark.sql.execution.datasources.text.{TextFileFormat,
TextOptions}
+import org.apache.spark.sql.execution.datasources.text.TextFileFormat
import org.apache.spark.sql.types.StructType
import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.util.Utils
@@ -92,26 +92,30 @@ object TextInputJsonDataSource extends JsonDataSource {
sparkSession: SparkSession,
inputPaths: Seq[FileStatus],
parsedOptions: JSONOptions): StructType = {
- val json: Dataset[String] = createBaseDataset(
- sparkSession, inputPaths, parsedOptions.lineSeparator)
+ val json: Dataset[String] = createBaseDataset(sparkSession, inputPaths,
parsedOptions)
+
inferFromDataset(json, parsedOptions)
}
def inferFromDataset(json: Dataset[String], parsedOptions: JSONOptions):
StructType = {
val sampled: Dataset[String] = JsonUtils.sample(json, parsedOptions)
- val rdd: RDD[UTF8String] =
sampled.queryExecution.toRdd.map(_.getUTF8String(0))
- JsonInferSchema.infer(rdd, parsedOptions, CreateJacksonParser.utf8String)
+ val rdd: RDD[InternalRow] = sampled.queryExecution.toRdd
+ val rowParser = parsedOptions.encoding.map { enc =>
+ CreateJacksonParser.internalRow(enc, _: JsonFactory, _: InternalRow)
+ }.getOrElse(CreateJacksonParser.internalRow(_: JsonFactory, _:
InternalRow))
+
+ JsonInferSchema.infer(rdd, parsedOptions, rowParser)
}
private def createBaseDataset(
sparkSession: SparkSession,
inputPaths: Seq[FileStatus],
- lineSeparator: Option[String]): Dataset[String] = {
- val textOptions = lineSeparator.map { lineSep =>
- Map(TextOptions.LINE_SEPARATOR -> lineSep)
- }.getOrElse(Map.empty[String, String])
-
+ parsedOptions: JSONOptions): Dataset[String] = {
val paths = inputPaths.map(_.getPath.toString)
+ val textOptions = Map.empty[String, String] ++
+ parsedOptions.encoding.map("encoding" -> _) ++
+ parsedOptions.lineSeparator.map("lineSep" -> _)
+
sparkSession.baseRelationToDataFrame(
DataSource.apply(
sparkSession,
@@ -129,8 +133,12 @@ object TextInputJsonDataSource extends JsonDataSource {
schema: StructType): Iterator[InternalRow] = {
val linesReader = new HadoopFileLinesReader(file,
parser.options.lineSeparatorInRead, conf)
Option(TaskContext.get()).foreach(_.addTaskCompletionListener(_ =>
linesReader.close()))
+ val textParser = parser.options.encoding
+ .map(enc => CreateJacksonParser.text(enc, _: JsonFactory, _: Text))
+ .getOrElse(CreateJacksonParser.text(_: JsonFactory, _: Text))
+
val safeParser = new FailureSafeParser[Text](
- input => parser.parse(input, CreateJacksonParser.text, textToUTF8String),
+ input => parser.parse(input, textParser, textToUTF8String),
parser.options.parseMode,
schema,
parser.options.columnNameOfCorruptRecord)
@@ -153,7 +161,11 @@ object MultiLineJsonDataSource extends JsonDataSource {
parsedOptions: JSONOptions): StructType = {
val json: RDD[PortableDataStream] = createBaseRdd(sparkSession, inputPaths)
val sampled: RDD[PortableDataStream] = JsonUtils.sample(json,
parsedOptions)
- JsonInferSchema.infer(sampled, parsedOptions, createParser)
+ val parser = parsedOptions.encoding
+ .map(enc => createParser(enc, _: JsonFactory, _: PortableDataStream))
+ .getOrElse(createParser(_: JsonFactory, _: PortableDataStream))
+
+ JsonInferSchema.infer[PortableDataStream](sampled, parsedOptions, parser)
}
private def createBaseRdd(
@@ -175,11 +187,18 @@ object MultiLineJsonDataSource extends JsonDataSource {
.values
}
- private def createParser(jsonFactory: JsonFactory, record:
PortableDataStream): JsonParser = {
- val path = new Path(record.getPath())
- CreateJacksonParser.inputStream(
- jsonFactory,
- CodecStreams.createInputStreamWithCloseResource(record.getConfiguration,
path))
+ private def dataToInputStream(dataStream: PortableDataStream): InputStream =
{
+ val path = new Path(dataStream.getPath())
+
CodecStreams.createInputStreamWithCloseResource(dataStream.getConfiguration,
path)
+ }
+
+ private def createParser(jsonFactory: JsonFactory, stream:
PortableDataStream): JsonParser = {
+ CreateJacksonParser.inputStream(jsonFactory, dataToInputStream(stream))
+ }
+
+ private def createParser(enc: String, jsonFactory: JsonFactory,
+ stream: PortableDataStream): JsonParser = {
+ CreateJacksonParser.inputStream(enc, jsonFactory,
dataToInputStream(stream))
}
override def readFile(
@@ -194,9 +213,12 @@ object MultiLineJsonDataSource extends JsonDataSource {
UTF8String.fromBytes(ByteStreams.toByteArray(inputStream))
}
}
+ val streamParser = parser.options.encoding
+ .map(enc => CreateJacksonParser.inputStream(enc, _: JsonFactory, _:
InputStream))
+ .getOrElse(CreateJacksonParser.inputStream(_: JsonFactory, _:
InputStream))
val safeParser = new FailureSafeParser[InputStream](
- input => parser.parse(input, CreateJacksonParser.inputStream,
partitionedFileString),
+ input => parser.parse[InputStream](input, streamParser,
partitionedFileString),
parser.options.parseMode,
schema,
parser.options.columnNameOfCorruptRecord)
http://git-wip-us.apache.org/repos/asf/spark/blob/bd14da6f/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
----------------------------------------------------------------------
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
index 0862c74..3b04510 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
@@ -17,6 +17,8 @@
package org.apache.spark.sql.execution.datasources.json
+import java.nio.charset.{Charset, StandardCharsets}
+
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
@@ -151,7 +153,13 @@ private[json] class JsonOutputWriter(
context: TaskAttemptContext)
extends OutputWriter with Logging {
- private val writer = CodecStreams.createOutputStreamWriter(context, new
Path(path))
+ private val encoding = options.encoding match {
+ case Some(charsetName) => Charset.forName(charsetName)
+ case None => StandardCharsets.UTF_8
+ }
+
+ private val writer = CodecStreams.createOutputStreamWriter(
+ context, new Path(path), encoding)
// create the Generator without separator inserted between 2 records
private[this] val gen = new JacksonGenerator(dataSchema, writer, options)
http://git-wip-us.apache.org/repos/asf/spark/blob/bd14da6f/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextOptions.scala
----------------------------------------------------------------------
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextOptions.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextOptions.scala
index 5c1a354..e4e2019 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextOptions.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextOptions.scala
@@ -17,7 +17,7 @@
package org.apache.spark.sql.execution.datasources.text
-import java.nio.charset.StandardCharsets
+import java.nio.charset.{Charset, StandardCharsets}
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap,
CompressionCodecs}
@@ -41,13 +41,18 @@ private[text] class TextOptions(@transient private val
parameters: CaseInsensiti
*/
val wholeText = parameters.getOrElse(WHOLETEXT, "false").toBoolean
- private val lineSeparator: Option[String] =
parameters.get(LINE_SEPARATOR).map { sep =>
- require(sep.nonEmpty, s"'$LINE_SEPARATOR' cannot be an empty string.")
- sep
+ val encoding: Option[String] = parameters.get(ENCODING)
+
+ val lineSeparator: Option[String] = parameters.get(LINE_SEPARATOR).map {
lineSep =>
+ require(lineSep.nonEmpty, s"'$LINE_SEPARATOR' cannot be an empty string.")
+
+ lineSep
}
+
// Note that the option 'lineSep' uses a different default value in read and
write.
- val lineSeparatorInRead: Option[Array[Byte]] =
- lineSeparator.map(_.getBytes(StandardCharsets.UTF_8))
+ val lineSeparatorInRead: Option[Array[Byte]] = lineSeparator.map { lineSep =>
+
lineSep.getBytes(encoding.map(Charset.forName(_)).getOrElse(StandardCharsets.UTF_8))
+ }
val lineSeparatorInWrite: Array[Byte] =
lineSeparatorInRead.getOrElse("\n".getBytes(StandardCharsets.UTF_8))
}
@@ -55,5 +60,6 @@ private[text] class TextOptions(@transient private val
parameters: CaseInsensiti
private[datasources] object TextOptions {
val COMPRESSION = "compression"
val WHOLETEXT = "wholetext"
+ val ENCODING = "encoding"
val LINE_SEPARATOR = "lineSep"
}
http://git-wip-us.apache.org/repos/asf/spark/blob/bd14da6f/sql/core/src/test/resources/test-data/utf16LE.json
----------------------------------------------------------------------
diff --git a/sql/core/src/test/resources/test-data/utf16LE.json
b/sql/core/src/test/resources/test-data/utf16LE.json
new file mode 100644
index 0000000..ce4117f
Binary files /dev/null and b/sql/core/src/test/resources/test-data/utf16LE.json
differ
http://git-wip-us.apache.org/repos/asf/spark/blob/bd14da6f/sql/core/src/test/resources/test-data/utf16WithBOM.json
----------------------------------------------------------------------
diff --git a/sql/core/src/test/resources/test-data/utf16WithBOM.json
b/sql/core/src/test/resources/test-data/utf16WithBOM.json
new file mode 100644
index 0000000..cf4d293
Binary files /dev/null and
b/sql/core/src/test/resources/test-data/utf16WithBOM.json differ
http://git-wip-us.apache.org/repos/asf/spark/blob/bd14da6f/sql/core/src/test/resources/test-data/utf32BEWithBOM.json
----------------------------------------------------------------------
diff --git a/sql/core/src/test/resources/test-data/utf32BEWithBOM.json
b/sql/core/src/test/resources/test-data/utf32BEWithBOM.json
new file mode 100644
index 0000000..6c7733c
Binary files /dev/null and
b/sql/core/src/test/resources/test-data/utf32BEWithBOM.json differ
http://git-wip-us.apache.org/repos/asf/spark/blob/bd14da6f/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonBenchmarks.scala
----------------------------------------------------------------------
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonBenchmarks.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonBenchmarks.scala
new file mode 100644
index 0000000..85cf054
--- /dev/null
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonBenchmarks.scala
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.datasources.json
+
+import java.io.File
+
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.types.{LongType, StringType, StructType}
+import org.apache.spark.util.{Benchmark, Utils}
+
+/**
+ * The benchmarks aims to measure performance of JSON parsing when encoding is
set and isn't.
+ * To run this:
+ * spark-submit --class <this class> --jars <spark sql test jar>
+ */
+object JSONBenchmarks {
+ val conf = new SparkConf()
+
+ val spark = SparkSession.builder
+ .master("local[1]")
+ .appName("benchmark-json-datasource")
+ .config(conf)
+ .getOrCreate()
+ import spark.implicits._
+
+ def withTempPath(f: File => Unit): Unit = {
+ val path = Utils.createTempDir()
+ path.delete()
+ try f(path) finally Utils.deleteRecursively(path)
+ }
+
+
+ def schemaInferring(rowsNum: Int): Unit = {
+ val benchmark = new Benchmark("JSON schema inferring", rowsNum)
+
+ withTempPath { path =>
+ // scalastyle:off println
+ benchmark.out.println("Preparing data for benchmarking ...")
+ // scalastyle:on println
+
+ spark.sparkContext.range(0, rowsNum, 1)
+ .map(_ => "a")
+ .toDF("fieldA")
+ .write
+ .option("encoding", "UTF-8")
+ .json(path.getAbsolutePath)
+
+ benchmark.addCase("No encoding", 3) { _ =>
+ spark.read.json(path.getAbsolutePath)
+ }
+
+ benchmark.addCase("UTF-8 is set", 3) { _ =>
+ spark.read
+ .option("encoding", "UTF-8")
+ .json(path.getAbsolutePath)
+ }
+
+ /*
+ Intel(R) Core(TM) i7-7920HQ CPU @ 3.10GHz
+
+ JSON schema inferring: Best/Avg Time(ms) Rate(M/s)
Per Row(ns) Relative
+
--------------------------------------------------------------------------------------------
+ No encoding 38902 / 39282 2.6
389.0 1.0X
+ UTF-8 is set 56959 / 57261 1.8
569.6 0.7X
+ */
+ benchmark.run()
+ }
+ }
+
+ def perlineParsing(rowsNum: Int): Unit = {
+ val benchmark = new Benchmark("JSON per-line parsing", rowsNum)
+
+ withTempPath { path =>
+ // scalastyle:off println
+ benchmark.out.println("Preparing data for benchmarking ...")
+ // scalastyle:on println
+
+ spark.sparkContext.range(0, rowsNum, 1)
+ .map(_ => "a")
+ .toDF("fieldA")
+ .write.json(path.getAbsolutePath)
+ val schema = new StructType().add("fieldA", StringType)
+
+ benchmark.addCase("No encoding", 3) { _ =>
+ spark.read
+ .schema(schema)
+ .json(path.getAbsolutePath)
+ .count()
+ }
+
+ benchmark.addCase("UTF-8 is set", 3) { _ =>
+ spark.read
+ .option("encoding", "UTF-8")
+ .schema(schema)
+ .json(path.getAbsolutePath)
+ .count()
+ }
+
+ /*
+ Intel(R) Core(TM) i7-7920HQ CPU @ 3.10GHz
+
+ JSON per-line parsing: Best/Avg Time(ms) Rate(M/s)
Per Row(ns) Relative
+
--------------------------------------------------------------------------------------------
+ No encoding 25947 / 26188 3.9
259.5 1.0X
+ UTF-8 is set 46319 / 46417 2.2
463.2 0.6X
+ */
+ benchmark.run()
+ }
+ }
+
+ def perlineParsingOfWideColumn(rowsNum: Int): Unit = {
+ val benchmark = new Benchmark("JSON parsing of wide lines", rowsNum)
+
+ withTempPath { path =>
+ // scalastyle:off println
+ benchmark.out.println("Preparing data for benchmarking ...")
+ // scalastyle:on println
+
+ spark.sparkContext.range(0, rowsNum, 1)
+ .map { i =>
+ val s = "abcdef0123456789ABCDEF" * 20
+ s"""{"a":"$s","b":
$i,"c":"$s","d":$i,"e":"$s","f":$i,"x":"$s","y":$i,"z":"$s"}"""
+ }
+ .toDF().write.text(path.getAbsolutePath)
+ val schema = new StructType()
+ .add("a", StringType).add("b", LongType)
+ .add("c", StringType).add("d", LongType)
+ .add("e", StringType).add("f", LongType)
+ .add("x", StringType).add("y", LongType)
+ .add("z", StringType)
+
+ benchmark.addCase("No encoding", 3) { _ =>
+ spark.read
+ .schema(schema)
+ .json(path.getAbsolutePath)
+ .count()
+ }
+
+ benchmark.addCase("UTF-8 is set", 3) { _ =>
+ spark.read
+ .option("encoding", "UTF-8")
+ .schema(schema)
+ .json(path.getAbsolutePath)
+ .count()
+ }
+
+ /*
+ Intel(R) Core(TM) i7-7920HQ CPU @ 3.10GHz
+
+ JSON parsing of wide lines: Best/Avg Time(ms) Rate(M/s)
Per Row(ns) Relative
+
--------------------------------------------------------------------------------------------
+ No encoding 45543 / 45660 0.2
4554.3 1.0X
+ UTF-8 is set 65737 / 65957 0.2
6573.7 0.7X
+ */
+ benchmark.run()
+ }
+ }
+
+ def main(args: Array[String]): Unit = {
+ schemaInferring(100 * 1000 * 1000)
+ perlineParsing(100 * 1000 * 1000)
+ perlineParsingOfWideColumn(10 * 1000 * 1000)
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/bd14da6f/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
----------------------------------------------------------------------
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
index a58dff8..0db688f 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
@@ -17,8 +17,8 @@
package org.apache.spark.sql.execution.datasources.json
-import java.io.{File, StringWriter}
-import java.nio.charset.StandardCharsets
+import java.io.{File, FileOutputStream, StringWriter}
+import java.nio.charset.{StandardCharsets, UnsupportedCharsetException}
import java.nio.file.{Files, Paths, StandardOpenOption}
import java.sql.{Date, Timestamp}
import java.util.Locale
@@ -48,6 +48,10 @@ class TestFileFilter extends PathFilter {
class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
import testImplicits._
+ def testFile(fileName: String): String = {
+ Thread.currentThread().getContextClassLoader.getResource(fileName).toString
+ }
+
test("Type promotion") {
def checkTypePromotion(expected: Any, actual: Any) {
assert(expected.getClass == actual.getClass,
@@ -2167,4 +2171,241 @@ class JsonSuite extends QueryTest with SharedSQLContext
with TestJsonData {
val sampled = spark.read.option("samplingRatio", 1.0).json(ds)
assert(sampled.count() == ds.count())
}
+
+ test("SPARK-23723: json in UTF-16 with BOM") {
+ val fileName = "test-data/utf16WithBOM.json"
+ val schema = new StructType().add("firstName", StringType).add("lastName",
StringType)
+ val jsonDF = spark.read.schema(schema)
+ .option("multiline", "true")
+ .option("encoding", "UTF-16")
+ .json(testFile(fileName))
+
+ checkAnswer(jsonDF, Seq(Row("Chris", "Baird"), Row("Doug", "Rood")))
+ }
+
+ test("SPARK-23723: multi-line json in UTF-32BE with BOM") {
+ val fileName = "test-data/utf32BEWithBOM.json"
+ val schema = new StructType().add("firstName", StringType).add("lastName",
StringType)
+ val jsonDF = spark.read.schema(schema)
+ .option("multiline", "true")
+ .json(testFile(fileName))
+
+ checkAnswer(jsonDF, Seq(Row("Chris", "Baird")))
+ }
+
+ test("SPARK-23723: Use user's encoding in reading of multi-line json in
UTF-16LE") {
+ val fileName = "test-data/utf16LE.json"
+ val schema = new StructType().add("firstName", StringType).add("lastName",
StringType)
+ val jsonDF = spark.read.schema(schema)
+ .option("multiline", "true")
+ .options(Map("encoding" -> "UTF-16LE"))
+ .json(testFile(fileName))
+
+ checkAnswer(jsonDF, Seq(Row("Chris", "Baird")))
+ }
+
+ test("SPARK-23723: Unsupported encoding name") {
+ val invalidCharset = "UTF-128"
+ val exception = intercept[UnsupportedCharsetException] {
+ spark.read
+ .options(Map("encoding" -> invalidCharset, "lineSep" -> "\n"))
+ .json(testFile("test-data/utf16LE.json"))
+ .count()
+ }
+
+ assert(exception.getMessage.contains(invalidCharset))
+ }
+
+ test("SPARK-23723: checking that the encoding option is case agnostic") {
+ val fileName = "test-data/utf16LE.json"
+ val schema = new StructType().add("firstName", StringType).add("lastName",
StringType)
+ val jsonDF = spark.read.schema(schema)
+ .option("multiline", "true")
+ .options(Map("encoding" -> "uTf-16lE"))
+ .json(testFile(fileName))
+
+ checkAnswer(jsonDF, Seq(Row("Chris", "Baird")))
+ }
+
+
+ test("SPARK-23723: specified encoding is not matched to actual encoding") {
+ val fileName = "test-data/utf16LE.json"
+ val schema = new StructType().add("firstName", StringType).add("lastName",
StringType)
+ val exception = intercept[SparkException] {
+ spark.read.schema(schema)
+ .option("mode", "FAILFAST")
+ .option("multiline", "true")
+ .options(Map("encoding" -> "UTF-16BE"))
+ .json(testFile(fileName))
+ .count()
+ }
+ val errMsg = exception.getMessage
+
+ assert(errMsg.contains("Malformed records are detected in record parsing"))
+ }
+
+ def checkEncoding(expectedEncoding: String, pathToJsonFiles: String,
+ expectedContent: String): Unit = {
+ val jsonFiles = new File(pathToJsonFiles)
+ .listFiles()
+ .filter(_.isFile)
+ .filter(_.getName.endsWith("json"))
+ val actualContent = jsonFiles.map { file =>
+ new String(Files.readAllBytes(file.toPath), expectedEncoding)
+ }.mkString.trim
+
+ assert(actualContent == expectedContent)
+ }
+
+ test("SPARK-23723: save json in UTF-32BE") {
+ val encoding = "UTF-32BE"
+ withTempPath { path =>
+ val df = spark.createDataset(Seq(("Dog", 42)))
+ df.write
+ .options(Map("encoding" -> encoding, "lineSep" -> "\n"))
+ .json(path.getCanonicalPath)
+
+ checkEncoding(
+ expectedEncoding = encoding,
+ pathToJsonFiles = path.getCanonicalPath,
+ expectedContent = """{"_1":"Dog","_2":42}""")
+ }
+ }
+
+ test("SPARK-23723: save json in default encoding - UTF-8") {
+ withTempPath { path =>
+ val df = spark.createDataset(Seq(("Dog", 42)))
+ df.write.json(path.getCanonicalPath)
+
+ checkEncoding(
+ expectedEncoding = "UTF-8",
+ pathToJsonFiles = path.getCanonicalPath,
+ expectedContent = """{"_1":"Dog","_2":42}""")
+ }
+ }
+
+ test("SPARK-23723: wrong output encoding") {
+ val encoding = "UTF-128"
+ val exception = intercept[UnsupportedCharsetException] {
+ withTempPath { path =>
+ val df = spark.createDataset(Seq((0)))
+ df.write
+ .options(Map("encoding" -> encoding, "lineSep" -> "\n"))
+ .json(path.getCanonicalPath)
+ }
+ }
+
+ assert(exception.getMessage == encoding)
+ }
+
+ test("SPARK-23723: read back json in UTF-16LE") {
+ val options = Map("encoding" -> "UTF-16LE", "lineSep" -> "\n")
+ withTempPath { path =>
+ val ds = spark.createDataset(Seq(("a", 1), ("b", 2), ("c",
3))).repartition(2)
+ ds.write.options(options).json(path.getCanonicalPath)
+
+ val readBack = spark
+ .read
+ .options(options)
+ .json(path.getCanonicalPath)
+
+ checkAnswer(readBack.toDF(), ds.toDF())
+ }
+ }
+
+ def checkReadJson(lineSep: String, encoding: String, inferSchema: Boolean,
id: Int): Unit = {
+ test(s"SPARK-23724: checks reading json in ${encoding} #${id}") {
+ val schema = new StructType().add("f1", StringType).add("f2",
IntegerType)
+ withTempPath { path =>
+ val records = List(("a", 1), ("b", 2))
+ val data = records
+ .map(rec => s"""{"f1":"${rec._1}",
"f2":${rec._2}}""".getBytes(encoding))
+ .reduce((a1, a2) => a1 ++ lineSep.getBytes(encoding) ++ a2)
+ val os = new FileOutputStream(path)
+ os.write(data)
+ os.close()
+ val reader = if (inferSchema) {
+ spark.read
+ } else {
+ spark.read.schema(schema)
+ }
+ val readBack = reader
+ .option("encoding", encoding)
+ .option("lineSep", lineSep)
+ .json(path.getCanonicalPath)
+ checkAnswer(readBack, records.map(rec => Row(rec._1, rec._2)))
+ }
+ }
+ }
+
+ // scalastyle:off nonascii
+ List(
+ (0, "|", "UTF-8", false),
+ (1, "^", "UTF-16BE", true),
+ (2, "::", "ISO-8859-1", true),
+ (3, "!!!@3", "UTF-32LE", false),
+ (4, 0x1E.toChar.toString, "UTF-8", true),
+ (5, "ì", "UTF-32BE", false),
+ (6, "кÑкÑ", "CP1251", true),
+ (7, "sep", "utf-8", false),
+ (8, "\r\n", "UTF-16LE", false),
+ (9, "\r\n", "utf-16be", true),
+ (10, "\u000d\u000a", "UTF-32BE", false),
+ (11, "\u000a\u000d", "UTF-8", true),
+ (12, "===", "US-ASCII", false),
+ (13, "$^+", "utf-32le", true)
+ ).foreach {
+ case (testNum, sep, encoding, inferSchema) => checkReadJson(sep, encoding,
inferSchema, testNum)
+ }
+ // scalastyle:on nonascii
+
+ test("SPARK-23724: lineSep should be set if encoding if different from
UTF-8") {
+ val encoding = "UTF-16LE"
+ val exception = intercept[IllegalArgumentException] {
+ spark.read
+ .options(Map("encoding" -> encoding))
+ .json(testFile("test-data/utf16LE.json"))
+ .count()
+ }
+
+ assert(exception.getMessage.contains(
+ s"""The lineSep option must be specified for the $encoding encoding"""))
+ }
+
+ private val badJson = "\u0000\u0000\u0000A\u0001AAA"
+
+ test("SPARK-23094: permissively read JSON file with leading nulls when
multiLine is enabled") {
+ withTempPath { tempDir =>
+ val path = tempDir.getAbsolutePath
+ Seq(badJson + """{"a":1}""").toDS().write.text(path)
+ val expected = s"""${badJson}{"a":1}\n"""
+ val schema = new StructType().add("a",
IntegerType).add("_corrupt_record", StringType)
+ val df = spark.read.format("json")
+ .option("mode", "PERMISSIVE")
+ .option("multiLine", true)
+ .option("encoding", "UTF-8")
+ .schema(schema).load(path)
+ checkAnswer(df, Row(null, expected))
+ }
+ }
+
+ test("SPARK-23094: permissively read JSON file with leading nulls when
multiLine is disabled") {
+ withTempPath { tempDir =>
+ val path = tempDir.getAbsolutePath
+ Seq(badJson, """{"a":1}""").toDS().write.text(path)
+ val schema = new StructType().add("a",
IntegerType).add("_corrupt_record", StringType)
+ val df = spark.read.format("json")
+ .option("mode", "PERMISSIVE")
+ .option("multiLine", false)
+ .option("encoding", "UTF-8")
+ .schema(schema).load(path)
+ checkAnswer(df, Seq(Row(1, null), Row(null, badJson)))
+ }
+ }
+
+ test("SPARK-23094: permissively parse a dataset contains JSON with leading
nulls") {
+ checkAnswer(
+ spark.read.option("mode", "PERMISSIVE").option("encoding",
"UTF-8").json(Seq(badJson).toDS()),
+ Row(badJson))
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]