This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new d5e13234dfb [SPARK-45399][SQL] XML: Add XML Options using newOption
d5e13234dfb is described below
commit d5e13234dfb29566c33427ccd2c4318f7dcdb6d2
Author: Sandip Agarwala <[email protected]>
AuthorDate: Wed Oct 4 08:35:48 2023 +0900
[SPARK-45399][SQL] XML: Add XML Options using newOption
### What changes were proposed in this pull request?
Add XML Options using `DataSourceOptions::newOption`
Also, removed legacy date and timestamp support as these are not applicable
to newly introduced XML data source.
### Why are the changes needed?
Consistency with other formats like json and csv.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
XML unit tests and github action
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #43201 from sandip-db/xml-cleanup-options.
Authored-by: Sandip Agarwala <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
---
.../spark/sql/catalyst/xml/StaxXmlParser.scala | 19 -----
.../apache/spark/sql/catalyst/xml/TypeCast.scala | 23 +-----
.../apache/spark/sql/catalyst/xml/XmlOptions.scala | 87 +++++++++++-----------
.../sql/execution/datasources/xml/XmlSuite.scala | 2 +-
4 files changed, 46 insertions(+), 85 deletions(-)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/StaxXmlParser.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/StaxXmlParser.scala
index 71022ba281c..ac29e234e5f 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/StaxXmlParser.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/StaxXmlParser.scala
@@ -35,7 +35,6 @@ import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData,
BadRecordException
import org.apache.spark.sql.catalyst.xml.StaxXmlParser.convertStream
import org.apache.spark.sql.catalyst.xml.TypeCast._
import org.apache.spark.sql.errors.QueryExecutionErrors
-import org.apache.spark.sql.internal.{LegacyBehaviorPolicy, SQLConf}
import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
@@ -47,24 +46,6 @@ class StaxXmlParser(
private val factory = options.buildXmlFactory()
- // Flags to signal if we need to fall back to the backward compatible
behavior of parsing
- // dates and timestamps.
- // For more information, see comments for "enableDateTimeParsingFallback"
option in XmlOptions.
- private val enableParsingFallbackForTimestampType =
- options.enableDateTimeParsingFallback
- .orElse(SQLConf.get.jsonEnableDateTimeParsingFallback)
- .getOrElse {
- SQLConf.get.legacyTimeParserPolicy == LegacyBehaviorPolicy.LEGACY ||
- options.timestampFormatInRead.isEmpty
- }
- private val enableParsingFallbackForDateType =
- options.enableDateTimeParsingFallback
- .orElse(SQLConf.get.jsonEnableDateTimeParsingFallback)
- .getOrElse {
- SQLConf.get.legacyTimeParserPolicy == LegacyBehaviorPolicy.LEGACY ||
- options.dateFormatInRead.isEmpty
- }
-
/**
* Parses a single XML string and turns it into either one resulting row or
no row (if the
* the record is malformed).
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/TypeCast.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/TypeCast.scala
index b065dd41f28..3315196ffc7 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/TypeCast.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/TypeCast.scala
@@ -22,10 +22,7 @@ import java.util.Locale
import scala.util.Try
import scala.util.control.Exception._
-import scala.util.control.NonFatal
-import org.apache.spark.sql.catalyst.util.DateTimeUtils
-import org.apache.spark.sql.internal.{LegacyBehaviorPolicy, SQLConf}
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
@@ -85,25 +82,7 @@ private[sql] object TypeCast {
}
private def parseXmlTimestamp(value: String, options: XmlOptions): Long = {
- try {
- options.timestampFormatter.parse(value)
- } catch {
- case NonFatal(e) =>
- // If fails to parse, then tries the way used in 2.0 and 1.x for
backwards
- // compatibility if enabled.
- val enableParsingFallbackForTimestampType =
- options.enableDateTimeParsingFallback
- .orElse(SQLConf.get.jsonEnableDateTimeParsingFallback)
- .getOrElse {
- SQLConf.get.legacyTimeParserPolicy ==
LegacyBehaviorPolicy.LEGACY ||
- options.timestampFormatInRead.isEmpty
- }
- if (!enableParsingFallbackForTimestampType) {
- throw e
- }
- val str =
DateTimeUtils.cleanLegacyTimestampStr(UTF8String.fromString(value))
- DateTimeUtils.stringToTimestamp(str, options.zoneId).getOrElse(throw e)
- }
+ options.timestampFormatter.parse(value)
}
// TODO: This function unnecessarily does type dispatch. Should merge it
with `castTo`.
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/XmlOptions.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/XmlOptions.scala
index 3aef9916a89..d0cfff87279 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/XmlOptions.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/XmlOptions.scala
@@ -42,7 +42,7 @@ private[sql] class XmlOptions(
def this(
parameters: Map[String, String] = Map.empty,
defaultTimeZoneId: String = SQLConf.get.sessionLocalTimeZone,
- defaultColumnNameOfCorruptRecord: String = "") = {
+ defaultColumnNameOfCorruptRecord: String =
SQLConf.get.columnNameOfCorruptRecord) = {
this(
CaseInsensitiveMap(parameters),
defaultTimeZoneId,
@@ -62,42 +62,39 @@ private[sql] class XmlOptions(
}
}
- val compressionCodec =
parameters.get("compression").orElse(parameters.get("codec"))
- .map(CompressionCodecs.getCodecClassName)
- val rowTag = parameters.getOrElse("rowTag", XmlOptions.DEFAULT_ROW_TAG)
- require(rowTag.nonEmpty, "'rowTag' option should not be empty string.")
+ val compressionCodec =
parameters.get(COMPRESSION).map(CompressionCodecs.getCodecClassName)
+ val rowTag = parameters.getOrElse(ROW_TAG, XmlOptions.DEFAULT_ROW_TAG)
+ require(rowTag.nonEmpty, s"'$ROW_TAG' option should not be empty string.")
require(!rowTag.startsWith("<") && !rowTag.endsWith(">"),
- "'rowTag' should not include angle brackets")
- val rootTag = parameters.getOrElse("rootTag", XmlOptions.DEFAULT_ROOT_TAG)
+ s"'$ROW_TAG' should not include angle brackets")
+ val rootTag = parameters.getOrElse(ROOT_TAG, XmlOptions.DEFAULT_ROOT_TAG)
require(!rootTag.startsWith("<") && !rootTag.endsWith(">"),
- "'rootTag' should not include angle brackets")
- val declaration = parameters.getOrElse("declaration",
XmlOptions.DEFAULT_DECLARATION)
+ s"'$ROOT_TAG' should not include angle brackets")
+ val declaration = parameters.getOrElse(DECLARATION,
XmlOptions.DEFAULT_DECLARATION)
require(!declaration.startsWith("<") && !declaration.endsWith(">"),
- "'declaration' should not include angle brackets")
- val arrayElementName = parameters.getOrElse("arrayElementName",
+ s"'$DECLARATION' should not include angle brackets")
+ val arrayElementName = parameters.getOrElse(ARRAY_ELEMENT_NAME,
XmlOptions.DEFAULT_ARRAY_ELEMENT_NAME)
- val samplingRatio =
parameters.get("samplingRatio").map(_.toDouble).getOrElse(1.0)
- require(samplingRatio > 0, s"samplingRatio ($samplingRatio) should be
greater than 0")
- val excludeAttributeFlag =
parameters.get("excludeAttribute").map(_.toBoolean).getOrElse(false)
- val treatEmptyValuesAsNulls =
- parameters.get("treatEmptyValuesAsNulls").map(_.toBoolean).getOrElse(false)
+ val samplingRatio =
parameters.get(SAMPLING_RATIO).map(_.toDouble).getOrElse(1.0)
+ require(samplingRatio > 0, s"$SAMPLING_RATIO ($samplingRatio) should be
greater than 0")
+ val excludeAttributeFlag = getBool(EXCLUDE_ATTRIBUTE, false)
+ val treatEmptyValuesAsNulls = getBool(TREAT_EMPTY_VALUE_AS_NULLS, false)
val attributePrefix =
- parameters.getOrElse("attributePrefix",
XmlOptions.DEFAULT_ATTRIBUTE_PREFIX)
- val valueTag = parameters.getOrElse("valueTag", XmlOptions.DEFAULT_VALUE_TAG)
- require(valueTag.nonEmpty, "'valueTag' option should not be empty string.")
+ parameters.getOrElse(ATTRIBUTE_PREFIX, XmlOptions.DEFAULT_ATTRIBUTE_PREFIX)
+ val valueTag = parameters.getOrElse(VALUE_TAG, XmlOptions.DEFAULT_VALUE_TAG)
+ require(valueTag.nonEmpty, s"'$VALUE_TAG' option should not be empty
string.")
require(valueTag != attributePrefix,
- "'valueTag' and 'attributePrefix' options should not be the same.")
- val nullValue = parameters.getOrElse("nullValue",
XmlOptions.DEFAULT_NULL_VALUE)
+ s"'$VALUE_TAG' and '$ATTRIBUTE_PREFIX' options should not be the same.")
+ val nullValue = parameters.getOrElse(NULL_VALUE,
XmlOptions.DEFAULT_NULL_VALUE)
val columnNameOfCorruptRecord =
- parameters.getOrElse("columnNameOfCorruptRecord", "_corrupt_record")
- val ignoreSurroundingSpaces =
- parameters.get("ignoreSurroundingSpaces").map(_.toBoolean).getOrElse(false)
- val parseMode = ParseMode.fromString(parameters.getOrElse("mode",
PermissiveMode.name))
- val inferSchema =
parameters.get("inferSchema").map(_.toBoolean).getOrElse(true)
- val rowValidationXSDPath = parameters.get("rowValidationXSDPath").orNull
+ parameters.getOrElse(COLUMN_NAME_OF_CORRUPT_RECORD,
defaultColumnNameOfCorruptRecord)
+ val ignoreSurroundingSpaces = getBool(IGNORE_SURROUNDING_SPACES, false)
+ val parseMode = ParseMode.fromString(parameters.getOrElse(MODE,
PermissiveMode.name))
+ val inferSchema = getBool(INFER_SCHEMA, true)
+ val rowValidationXSDPath = parameters.get(ROW_VALIDATION_XSD_PATH).orNull
val wildcardColName =
- parameters.getOrElse("wildcardColName",
XmlOptions.DEFAULT_WILDCARD_COL_NAME)
- val ignoreNamespace =
parameters.get("ignoreNamespace").map(_.toBoolean).getOrElse(false)
+ parameters.getOrElse(WILDCARD_COL_NAME,
XmlOptions.DEFAULT_WILDCARD_COL_NAME)
+ val ignoreNamespace = getBool(IGNORE_NAMESPACE, false)
/**
* Infer columns with all valid date entries as date type (otherwise
inferred as string or
@@ -142,17 +139,6 @@ private[sql] class XmlOptions(
s"${DateFormatter.defaultPattern}'T'HH:mm:ss[.SSS][XXX]"
})
- // SPARK-39731: Enables the backward compatible parsing behavior.
- // Generally, this config should be set to false to avoid producing
potentially incorrect results
- // which is the current default (see JacksonParser).
- //
- // If enabled and the date cannot be parsed, we will fall back to
`DateTimeUtils.stringToDate`.
- // If enabled and the timestamp cannot be parsed,
`DateTimeUtils.stringToTimestamp` will be used.
- // Otherwise, depending on the parser policy and a custom pattern, an
exception may be thrown and
- // the value will be parsed as null.
- val enableDateTimeParsingFallback: Option[Boolean] =
- parameters.get(ENABLE_DATETIME_PARSING_FALLBACK).map(_.toBoolean)
-
val timezone = parameters.get("timezone")
val zoneId: ZoneId = DateTimeUtils.getZoneId(
@@ -207,19 +193,34 @@ private[sql] object XmlOptions extends DataSourceOptions {
val DEFAULT_CHARSET: String = StandardCharsets.UTF_8.name
val DEFAULT_NULL_VALUE: String = null
val DEFAULT_WILDCARD_COL_NAME = "xs_any"
+ val ROW_TAG = newOption("rowTag")
+ val ROOT_TAG = newOption("rootTag")
+ val DECLARATION = newOption("declaration")
+ val ARRAY_ELEMENT_NAME = newOption("arrayElementName")
+ val EXCLUDE_ATTRIBUTE = newOption("excludeAttribute")
+ val TREAT_EMPTY_VALUE_AS_NULLS = newOption("treatEmptyValuesAsNulls")
+ val ATTRIBUTE_PREFIX = newOption("attributePrefix")
+ val VALUE_TAG = newOption("valueTag")
+ val NULL_VALUE = newOption("nullValue")
+ val IGNORE_SURROUNDING_SPACES = newOption("ignoreSurroundingSpaces")
+ val ROW_VALIDATION_XSD_PATH = newOption("rowValidationXSDPath")
+ val WILDCARD_COL_NAME = newOption("wildcardColName")
+ val IGNORE_NAMESPACE = newOption("ignoreNamespace")
+ val INFER_SCHEMA = newOption("inferSchema")
val PREFER_DATE = newOption("preferDate")
+ val MODE = newOption("mode")
val LOCALE = newOption("locale")
val COMPRESSION = newOption("compression")
- val ENABLE_DATETIME_PARSING_FALLBACK =
newOption("enableDateTimeParsingFallback")
val MULTI_LINE = newOption("multiLine")
+ val SAMPLING_RATIO = newOption("samplingRatio")
+ val COLUMN_NAME_OF_CORRUPT_RECORD = newOption("columnNameOfCorruptRecord")
val DATE_FORMAT = newOption("dateFormat")
val TIMESTAMP_FORMAT = newOption("timestampFormat")
+ val TIME_ZONE = newOption("timeZone")
// Options with alternative
val ENCODING = "encoding"
val CHARSET = "charset"
newOption(ENCODING, CHARSET)
- val TIME_ZONE = "timezone"
- newOption(DateTimeUtils.TIMEZONE_OPTION, TIME_ZONE)
def apply(parameters: Map[String, String]): XmlOptions =
new XmlOptions(parameters, SQLConf.get.sessionLocalTimeZone)
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/xml/XmlSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/xml/XmlSuite.scala
index 7c0e8c6785f..aed0939f04d 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/xml/XmlSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/xml/XmlSuite.scala
@@ -332,7 +332,7 @@ class XmlSuite extends QueryTest with SharedSparkSession {
val cars = spark.read.xml(getTestResourcePath(resDir + "cars.xml"))
cars.write
.mode(SaveMode.Overwrite)
- .options(Map("codec" -> classOf[GzipCodec].getName))
+ .options(Map("compression" -> classOf[GzipCodec].getName))
.xml(copyFilePath.toString)
// Check that the part file has a .gz extension
assert(Files.list(copyFilePath).iterator().asScala
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]