Repository: spark
Updated Branches:
refs/heads/master e1f986c7a -> 9e204c62c
[SPARK-15840][SQL] Add two missing options in documentation and some option
related changes
## What changes were proposed in this pull request?
This PR
1. Adds the documentations for some missing options, `inferSchema` and
`mergeSchema` for Python and Scala.
2. Fiixes `[[DataFrame]]` to ```:class:`DataFrame` ``` so that this can be shown
- from

- to (with class link)

(Please refer [the latest
documentation](https://people.apache.org/~pwendell/spark-nightly/spark-master-docs/latest/api/python/pyspark.sql.html))
3. Moves `mergeSchema` option to `ParquetOptions` with removing unused options,
`metastoreSchema` and `metastoreTableName`.
They are not used anymore. They were removed in
https://github.com/apache/spark/commit/e720dda42e806229ccfd970055c7b8a93eb447bf
and there are no use cases as below:
```bash
grep -r -e METASTORE_SCHEMA -e \"metastoreSchema\" -e \"metastoreTableName\"
-e METASTORE_TABLE_NAME .
```
```
./sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala:
private[sql] val METASTORE_SCHEMA = "metastoreSchema"
./sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala:
private[sql] val METASTORE_TABLE_NAME = "metastoreTableName"
./sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala:
ParquetFileFormat.METASTORE_TABLE_NAME -> TableIdentifier(
```
It only sets `metastoreTableName` in the last case but does not use the table
name.
4. Sets the correct default values (in the documentation) for `compression`
option for ORC(`snappy`, see
[OrcOptions.scala#L33-L42](https://github.com/apache/spark/blob/3ded5bc4db2badc9ff49554e73421021d854306b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcOptions.scala#L33-L42))
and Parquet(`the value specified in SQLConf`, see
[ParquetOptions.scala#L38-L47](https://github.com/apache/spark/blob/3ded5bc4db2badc9ff49554e73421021d854306b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala#L38-L47))
and `columnNameOfCorruptRecord` for JSON(`the value specified in SQLConf`, see
[JsonFileFormat.scala#L53-L55](https://github.com/apache/spark/blob/4538443e276597530a27c6922e48503677b13956/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala#L53-L55)
and
[JsonFileFormat.scala#L105-L106](https://github.com/apache/spark/blob/4538443e276597530a27c6922e48503677b13956/sql/core/src/main/scala/org/apache/sp
ark/sql/execution/datasources/json/JsonFileFormat.scala#L105-L106)).
## How was this patch tested?
Existing tests should cover this.
Author: hyukjinkwon <[email protected]>
Author: Hyukjin Kwon <[email protected]>
Closes #13576 from HyukjinKwon/SPARK-15840.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9e204c62
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9e204c62
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9e204c62
Branch: refs/heads/master
Commit: 9e204c62c6800e03759e04ef68268105d4b86bf2
Parents: e1f986c
Author: hyukjinkwon <[email protected]>
Authored: Sat Jun 11 23:20:40 2016 -0700
Committer: Reynold Xin <[email protected]>
Committed: Sat Jun 11 23:20:40 2016 -0700
----------------------------------------------------------------------
python/pyspark/sql/readwriter.py | 40 +++++++++++++-------
.../org/apache/spark/sql/DataFrameReader.scala | 18 ++++++---
.../org/apache/spark/sql/DataFrameWriter.scala | 11 +++---
.../datasources/parquet/ParquetFileFormat.scala | 19 ++--------
.../datasources/parquet/ParquetOptions.scala | 15 +++++++-
.../spark/sql/hive/HiveMetastoreCatalog.scala | 12 ++----
6 files changed, 65 insertions(+), 50 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/9e204c62/python/pyspark/sql/readwriter.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py
index 7d1f186..f3182b2 100644
--- a/python/pyspark/sql/readwriter.py
+++ b/python/pyspark/sql/readwriter.py
@@ -209,7 +209,8 @@ class DataFrameReader(object):
:param columnNameOfCorruptRecord: allows renaming the new field having
malformed string
created by ``PERMISSIVE`` mode. This
overrides
``spark.sql.columnNameOfCorruptRecord``. If None is set,
- it uses the default value
``_corrupt_record``.
+ it uses the value specified in
+
``spark.sql.columnNameOfCorruptRecord``.
>>> df1 = spark.read.json('python/test_support/sql/people.json')
>>> df1.dtypes
@@ -276,6 +277,11 @@ class DataFrameReader(object):
def parquet(self, *paths):
"""Loads a Parquet file, returning the result as a :class:`DataFrame`.
+ You can set the following Parquet-specific option(s) for reading
Parquet files:
+ * ``mergeSchema``: sets whether we should merge schemas collected
from all \
+ Parquet part-files. This will override
``spark.sql.parquet.mergeSchema``. \
+ The default value is specified in
``spark.sql.parquet.mergeSchema``.
+
>>> df =
spark.read.parquet('python/test_support/sql/parquet_partitioned')
>>> df.dtypes
[('name', 'string'), ('year', 'int'), ('month', 'int'), ('day', 'int')]
@@ -285,7 +291,7 @@ class DataFrameReader(object):
@ignore_unicode_prefix
@since(1.6)
def text(self, paths):
- """Loads a text file and returns a [[DataFrame]] with a single string
column named "value".
+ """Loads a text file and returns a :class:`DataFrame` with a single
string column named "value".
If the directory structure of the text files contains partitioning
information,
those are ignored in the resulting DataFrame. To include partitioning
information as
columns, use ``read.format('text').load(...)``.
@@ -304,13 +310,14 @@ class DataFrameReader(object):
@since(2.0)
def csv(self, path, schema=None, sep=None, encoding=None, quote=None,
escape=None,
- comment=None, header=None, ignoreLeadingWhiteSpace=None,
ignoreTrailingWhiteSpace=None,
- nullValue=None, nanValue=None, positiveInf=None, negativeInf=None,
dateFormat=None,
- maxColumns=None, maxCharsPerColumn=None, mode=None):
- """Loads a CSV file and returns the result as a [[DataFrame]].
+ comment=None, header=None, inferSchema=None,
ignoreLeadingWhiteSpace=None,
+ ignoreTrailingWhiteSpace=None, nullValue=None, nanValue=None,
positiveInf=None,
+ negativeInf=None, dateFormat=None, maxColumns=None,
maxCharsPerColumn=None, mode=None):
+ """Loads a CSV file and returns the result as a :class:`DataFrame`.
- This function goes through the input once to determine the input
schema. To avoid going
- through the entire data once, specify the schema explicitly using
[[schema]].
+ This function will go through the input once to determine the input
schema if
+ ``inferSchema`` is enabled. To avoid going through the entire data
once, disable
+ ``inferSchema`` option or specify the schema explicitly using
``schema``.
:param path: string, or list of strings, for input path(s).
:param schema: an optional :class:`StructType` for the input schema.
@@ -328,6 +335,8 @@ class DataFrameReader(object):
character. By default (None), it is disabled.
:param header: uses the first line as names of columns. If None is
set, it uses the
default value, ``false``.
+ :param inferSchema: infers the input schema automatically from data.
It requires one extra
+ pass over the data. If None is set, it uses the default
value, ``false``.
:param ignoreLeadingWhiteSpace: defines whether or not leading
whitespaces from values
being read should be skipped. If None
is set, it uses
the default value, ``false``.
@@ -378,6 +387,8 @@ class DataFrameReader(object):
self.option("comment", comment)
if header is not None:
self.option("header", header)
+ if inferSchema is not None:
+ self.option("inferSchema", inferSchema)
if ignoreLeadingWhiteSpace is not None:
self.option("ignoreLeadingWhiteSpace", ignoreLeadingWhiteSpace)
if ignoreTrailingWhiteSpace is not None:
@@ -464,7 +475,7 @@ class DataFrameReader(object):
class DataFrameWriter(object):
"""
- Interface used to write a [[DataFrame]] to external storage systems
+ Interface used to write a :class:`DataFrame` to external storage systems
(e.g. file systems, key-value stores, etc). Use :func:`DataFrame.write`
to access this.
@@ -701,7 +712,7 @@ class DataFrameWriter(object):
In the case the table already exists, behavior of this function
depends on the
save mode, specified by the `mode` function (default to throwing an
exception).
- When `mode` is `Overwrite`, the schema of the [[DataFrame]] does not
need to be
+ When `mode` is `Overwrite`, the schema of the :class:`DataFrame` does
not need to be
the same as that of the existing table.
* `append`: Append contents of this :class:`DataFrame` to existing
data.
@@ -758,7 +769,9 @@ class DataFrameWriter(object):
:param partitionBy: names of partitioning columns
:param compression: compression codec to use when saving to file. This
can be one of the
known case-insensitive shorten names (none,
snappy, gzip, and lzo).
- This will overwrite
``spark.sql.parquet.compression.codec``.
+ This will override
``spark.sql.parquet.compression.codec``. If None
+ is set, it uses the value specified in
+ ``spark.sql.parquet.compression.codec``.
>>> df.write.parquet(os.path.join(tempfile.mkdtemp(), 'data'))
"""
@@ -788,7 +801,7 @@ class DataFrameWriter(object):
@since(2.0)
def csv(self, path, mode=None, compression=None, sep=None, quote=None,
escape=None,
header=None, nullValue=None, escapeQuotes=None):
- """Saves the content of the [[DataFrame]] in CSV format at the
specified path.
+ """Saves the content of the :class:`DataFrame` in CSV format at the
specified path.
:param path: the path in any Hadoop supported file system
:param mode: specifies the behavior of the save operation when data
already exists.
@@ -852,7 +865,8 @@ class DataFrameWriter(object):
:param partitionBy: names of partitioning columns
:param compression: compression codec to use when saving to file. This
can be one of the
known case-insensitive shorten names (none,
snappy, zlib, and lzo).
- This will overwrite ``orc.compress``.
+ This will override ``orc.compress``. If None is
set, it uses the
+ default value, ``snappy``.
>>> orc_df = spark.read.orc('python/test_support/sql/orc_partitioned')
>>> orc_df.write.orc(os.path.join(tempfile.mkdtemp(), 'data'))
http://git-wip-us.apache.org/repos/asf/spark/blob/9e204c62/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index bb5fa2b..078b63e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -304,9 +304,9 @@ class DataFrameReader private[sql](sparkSession:
SparkSession) extends Logging {
* <li>`DROPMALFORMED` : ignores the whole corrupted records.</li>
* <li>`FAILFAST` : throws an exception when it meets corrupted
records.</li>
* </ul>
- * <li>`columnNameOfCorruptRecord` (default `_corrupt_record`): allows
renaming the new field
- * having malformed string created by `PERMISSIVE` mode. This overrides
- * `spark.sql.columnNameOfCorruptRecord`.</li>
+ * <li>`columnNameOfCorruptRecord` (default is the value specified in
+ * `spark.sql.columnNameOfCorruptRecord`): allows renaming the new field
having malformed string
+ * created by `PERMISSIVE` mode. This overrides
`spark.sql.columnNameOfCorruptRecord`.</li>
*
* @since 1.6.0
*/
@@ -361,8 +361,9 @@ class DataFrameReader private[sql](sparkSession:
SparkSession) extends Logging {
/**
* Loads a CSV file and returns the result as a [[DataFrame]].
*
- * This function goes through the input once to determine the input schema.
To avoid going
- * through the entire data once, specify the schema explicitly using
[[schema]].
+ * This function will go through the input once to determine the input
schema if `inferSchema`
+ * is enabled. To avoid going through the entire data once, disable
`inferSchema` option or
+ * specify the schema explicitly using [[schema]].
*
* You can set the following CSV-specific options to deal with CSV files:
* <li>`sep` (default `,`): sets the single character as a separator for each
@@ -378,6 +379,8 @@ class DataFrameReader private[sql](sparkSession:
SparkSession) extends Logging {
* <li>`comment` (default empty string): sets the single character used for
skipping lines
* beginning with this character. By default, it is disabled.</li>
* <li>`header` (default `false`): uses the first line as names of
columns.</li>
+ * <li>`inferSchema` (default `false`): infers the input schema
automatically from data. It
+ * requires one extra pass over the data.</li>
* <li>`ignoreLeadingWhiteSpace` (default `false`): defines whether or not
leading whitespaces
* from values being read should be skipped.</li>
* <li>`ignoreTrailingWhiteSpace` (default `false`): defines whether or not
trailing
@@ -414,6 +417,11 @@ class DataFrameReader private[sql](sparkSession:
SparkSession) extends Logging {
* Loads a Parquet file, returning the result as a [[DataFrame]]. This
function returns an empty
* [[DataFrame]] if no paths are passed in.
*
+ * You can set the following Parquet-specific option(s) for reading Parquet
files:
+ * <li>`mergeSchema` (default is the value specified in
`spark.sql.parquet.mergeSchema`): sets
+ * whether we should merge schemas collected from all Parquet part-files.
This will override
+ * `spark.sql.parquet.mergeSchema`.</li>
+ *
* @since 1.4.0
*/
@scala.annotation.varargs
http://git-wip-us.apache.org/repos/asf/spark/blob/9e204c62/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index 1c2003c..8c05a7f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -725,9 +725,10 @@ final class DataFrameWriter[T] private[sql](ds:
Dataset[T]) {
* }}}
*
* You can set the following Parquet-specific option(s) for writing Parquet
files:
- * <li>`compression` (default `null`): compression codec to use when saving
to file. This can be
- * one of the known case-insensitive shorten names(`none`, `snappy`, `gzip`,
and `lzo`).
- * This will overwrite `spark.sql.parquet.compression.codec`. </li>
+ * <li>`compression` (default is the value specified in
`spark.sql.parquet.compression.codec`):
+ * compression codec to use when saving to file. This can be one of the
known case-insensitive
+ * shorten names(none, `snappy`, `gzip`, and `lzo`). This will override
+ * `spark.sql.parquet.compression.codec`.</li>
*
* @since 1.4.0
*/
@@ -744,9 +745,9 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T])
{
* }}}
*
* You can set the following ORC-specific option(s) for writing ORC files:
- * <li>`compression` (default `null`): compression codec to use when saving
to file. This can be
+ * <li>`compression` (default `snappy`): compression codec to use when
saving to file. This can be
* one of the known case-insensitive shorten names(`none`, `snappy`, `zlib`,
and `lzo`).
- * This will overwrite `orc.compress`. </li>
+ * This will override `orc.compress`.</li>
*
* @since 1.5.0
* @note Currently, this method can only be used after enabling Hive support
http://git-wip-us.apache.org/repos/asf/spark/blob/9e204c62/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
----------------------------------------------------------------------
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
index 3735c94..2d4bef3 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
@@ -144,12 +144,10 @@ private[sql] class ParquetFileFormat
sparkSession: SparkSession,
parameters: Map[String, String],
files: Seq[FileStatus]): Option[StructType] = {
+ val parquetOptions = new ParquetOptions(parameters,
sparkSession.sessionState.conf)
+
// Should we merge schemas from all Parquet part-files?
- val shouldMergeSchemas =
- parameters
- .get(ParquetFileFormat.MERGE_SCHEMA)
- .map(_.toBoolean)
-
.getOrElse(sparkSession.conf.get(SQLConf.PARQUET_SCHEMA_MERGING_ENABLED))
+ val shouldMergeSchemas = parquetOptions.mergeSchema
val mergeRespectSummaries =
sparkSession.conf.get(SQLConf.PARQUET_SCHEMA_RESPECT_SUMMARIES)
@@ -558,17 +556,6 @@ private[sql] class ParquetOutputWriter(
}
private[sql] object ParquetFileFormat extends Logging {
- // Whether we should merge schemas collected from all Parquet part-files.
- private[sql] val MERGE_SCHEMA = "mergeSchema"
-
- // Hive Metastore schema, used when converting Metastore Parquet tables.
This option is only used
- // internally.
- private[sql] val METASTORE_SCHEMA = "metastoreSchema"
-
- // If a ParquetRelation is converted from a Hive metastore table, this
option is set to the
- // original Hive table name.
- private[sql] val METASTORE_TABLE_NAME = "metastoreTableName"
-
/**
* If parquet's block size (row group size) setting is larger than the min
split size,
* we use parquet's block size setting as the min split size. Otherwise, we
will create
http://git-wip-us.apache.org/repos/asf/spark/blob/9e204c62/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala
----------------------------------------------------------------------
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala
index 1ff217c..dd2e915 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala
@@ -24,7 +24,7 @@ import org.apache.spark.sql.internal.SQLConf
/**
* Options for the Parquet data source.
*/
-private[parquet] class ParquetOptions(
+private[sql] class ParquetOptions(
@transient private val parameters: Map[String, String],
@transient private val sqlConf: SQLConf)
extends Serializable {
@@ -44,10 +44,21 @@ private[parquet] class ParquetOptions(
}
shortParquetCompressionCodecNames(codecName).name()
}
+
+ /**
+ * Whether it merges schemas or not. When the given Parquet files have
different schemas,
+ * the schemas can be merged. By default use the value specified in SQLConf.
+ */
+ val mergeSchema: Boolean = parameters
+ .get(MERGE_SCHEMA)
+ .map(_.toBoolean)
+ .getOrElse(sqlConf.getConf(SQLConf.PARQUET_SCHEMA_MERGING_ENABLED))
}
-private[parquet] object ParquetOptions {
+private[sql] object ParquetOptions {
+ private[sql] val MERGE_SCHEMA = "mergeSchema"
+
// The parquet compression short names
private val shortParquetCompressionCodecNames = Map(
"none" -> CompressionCodecName.UNCOMPRESSED,
http://git-wip-us.apache.org/repos/asf/spark/blob/9e204c62/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
----------------------------------------------------------------------
diff --git
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index f10afa7..d24cde2 100644
---
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -23,7 +23,7 @@ import com.google.common.cache.{CacheBuilder, CacheLoader,
LoadingCache}
import org.apache.hadoop.fs.Path
import org.apache.spark.internal.Logging
-import org.apache.spark.sql.{AnalysisException, SaveMode, SparkSession}
+import org.apache.spark.sql.{AnalysisException, SparkSession}
import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.expressions._
@@ -33,7 +33,7 @@ import org.apache.spark.sql.catalyst.rules._
import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils._
import
org.apache.spark.sql.execution.command.CreateHiveTableAsSelectLogicalPlan
import org.apache.spark.sql.execution.datasources.{Partition => _, _}
-import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
+import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat,
ParquetOptions}
import org.apache.spark.sql.hive.orc.OrcFileFormat
import org.apache.spark.sql.types._
@@ -355,13 +355,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession:
SparkSession) extends Log
val fileFormatClass = classOf[ParquetFileFormat]
val mergeSchema = sessionState.convertMetastoreParquetWithSchemaMerging
- val options = Map(
- ParquetFileFormat.MERGE_SCHEMA -> mergeSchema.toString,
- ParquetFileFormat.METASTORE_TABLE_NAME -> TableIdentifier(
- relation.tableName,
- Some(relation.databaseName)
- ).unquotedString
- )
+ val options = Map(ParquetOptions.MERGE_SCHEMA -> mergeSchema.toString)
convertToLogicalRelation(relation, options, defaultSource,
fileFormatClass, "parquet")
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]