This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 7db0af5 [SPARK-30668][SQL][FOLLOWUP] Raise exception instead of
silent change for new DateFormatter
7db0af5 is described below
commit 7db0af578585ecaeee9fd23f8189292289b52a97
Author: Yuanjian Li <[email protected]>
AuthorDate: Thu Mar 5 15:29:39 2020 +0800
[SPARK-30668][SQL][FOLLOWUP] Raise exception instead of silent change for
new DateFormatter
### What changes were proposed in this pull request?
This is a follow-up work for #27441. For the cases of new
TimestampFormatter return null while legacy formatter can return a value, we
need to throw an exception instead of silent change. The legacy config will be
referenced in the error message.
### Why are the changes needed?
Avoid silent result change for new behavior in 3.0.
### Does this PR introduce any user-facing change?
Yes, an exception is thrown when we detect legacy formatter can parse the
string and the new formatter return null.
### How was this patch tested?
Extend existing UT.
Closes #27537 from xuanyuanking/SPARK-30668-follow.
Authored-by: Yuanjian Li <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
---
.../scala/org/apache/spark/SparkException.scala | 7 +++
.../spark/sql/catalyst/csv/UnivocityParser.scala | 2 +
.../catalyst/expressions/datetimeExpressions.scala | 3 +
.../spark/sql/catalyst/json/JacksonParser.scala | 2 +
.../spark/sql/catalyst/util/DateFormatter.scala | 59 ++++++++++-------
.../catalyst/util/DateTimeFormatterHelper.scala | 26 +++++++-
.../sql/catalyst/util/TimestampFormatter.scala | 73 +++++++++++++---------
.../org/apache/spark/sql/internal/SQLConf.scala | 16 ++++-
.../expressions/DateExpressionsSuite.scala | 39 +++++++++---
.../sql/catalyst/json/JsonInferSchemaSuite.scala | 16 ++---
.../org/apache/spark/sql/CsvFunctionsSuite.scala | 20 ++++--
.../org/apache/spark/sql/DateFunctionsSuite.scala | 58 ++++++++++-------
.../sql/execution/datasources/csv/CSVSuite.scala | 22 ++++++-
.../sql/execution/datasources/json/JsonSuite.scala | 22 ++++++-
14 files changed, 269 insertions(+), 96 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/SparkException.scala
b/core/src/main/scala/org/apache/spark/SparkException.scala
index 4ad9a0c..81c087e 100644
--- a/core/src/main/scala/org/apache/spark/SparkException.scala
+++ b/core/src/main/scala/org/apache/spark/SparkException.scala
@@ -43,3 +43,10 @@ private[spark] case class SparkUserAppException(exitCode:
Int)
*/
private[spark] case class ExecutorDeadException(message: String)
extends SparkException(message)
+
+/**
+ * Exception thrown when Spark returns different result after upgrading to a
new version.
+ */
+private[spark] class SparkUpgradeException(version: String, message: String,
cause: Throwable)
+ extends SparkException("You may get a different result due to the upgrading
of Spark" +
+ s" $version: $message", cause)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
index f829e6b..dd8537b 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
@@ -23,6 +23,7 @@ import scala.util.control.NonFatal
import com.univocity.parsers.csv.CsvParser
+import org.apache.spark.SparkUpgradeException
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{ExprUtils,
GenericInternalRow}
@@ -285,6 +286,7 @@ class UnivocityParser(
}
}
} catch {
+ case e: SparkUpgradeException => throw e
case NonFatal(e) =>
badRecordException = badRecordException.orElse(Some(e))
row.setNullAt(i)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala
index 767dacf..81815fc 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala
@@ -26,6 +26,7 @@ import scala.util.control.NonFatal
import org.apache.commons.text.StringEscapeUtils
+import org.apache.spark.SparkUpgradeException
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.codegen._
@@ -789,6 +790,7 @@ abstract class ToTimestamp
formatter.parse(
t.asInstanceOf[UTF8String].toString) / downScaleFactor
} catch {
+ case e: SparkUpgradeException => throw e
case NonFatal(_) => null
}
}
@@ -802,6 +804,7 @@ abstract class ToTimestamp
TimestampFormatter(formatString, zoneId, legacyFormat =
SIMPLE_DATE_FORMAT)
.parse(t.asInstanceOf[UTF8String].toString) / downScaleFactor
} catch {
+ case e: SparkUpgradeException => throw e
case NonFatal(_) => null
}
}
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
index da3b501..d0db06c 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
@@ -25,6 +25,7 @@ import scala.util.control.NonFatal
import com.fasterxml.jackson.core._
+import org.apache.spark.SparkUpgradeException
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
@@ -382,6 +383,7 @@ class JacksonParser(
try {
row.update(index, fieldConverters(index).apply(parser))
} catch {
+ case e: SparkUpgradeException => throw e
case NonFatal(e) =>
badRecordException = badRecordException.orElse(Some(e))
parser.skipChildren()
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateFormatter.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateFormatter.scala
index 941c8fc..06ec918 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateFormatter.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateFormatter.scala
@@ -23,9 +23,9 @@ import java.util.{Date, Locale}
import org.apache.commons.lang3.time.FastDateFormat
-import org.apache.spark.sql.catalyst.util.DateTimeConstants.MICROS_PER_MILLIS
import org.apache.spark.sql.catalyst.util.DateTimeUtils._
import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy._
sealed trait DateFormatter extends Serializable {
def parse(s: String): Int // returns days since epoch
@@ -35,16 +35,24 @@ sealed trait DateFormatter extends Serializable {
class Iso8601DateFormatter(
pattern: String,
zoneId: ZoneId,
- locale: Locale) extends DateFormatter with DateTimeFormatterHelper {
+ locale: Locale,
+ legacyFormat: LegacyDateFormats.LegacyDateFormat)
+ extends DateFormatter with DateTimeFormatterHelper {
@transient
private lazy val formatter = getOrCreateFormatter(pattern, locale)
+ @transient
+ private lazy val legacyFormatter = DateFormatter.getLegacyFormatter(
+ pattern, zoneId, locale, legacyFormat)
+
override def parse(s: String): Int = {
val specialDate = convertSpecialDate(s.trim, zoneId)
specialDate.getOrElse {
- val localDate = LocalDate.parse(s, formatter)
- localDateToDays(localDate)
+ try {
+ val localDate = LocalDate.parse(s, formatter)
+ localDateToDays(localDate)
+ } catch checkDiffResult(s, legacyFormatter.parse)
}
}
@@ -88,33 +96,40 @@ object DateFormatter {
val defaultLocale: Locale = Locale.US
def defaultPattern(): String = {
- if (SQLConf.get.legacyTimeParserEnabled) "yyyy-MM-dd" else "uuuu-MM-dd"
+ if (SQLConf.get.legacyTimeParserPolicy == LEGACY) "yyyy-MM-dd" else
"uuuu-MM-dd"
}
private def getFormatter(
- format: Option[String],
- zoneId: ZoneId,
- locale: Locale = defaultLocale,
- legacyFormat: LegacyDateFormat = LENIENT_SIMPLE_DATE_FORMAT):
DateFormatter = {
-
+ format: Option[String],
+ zoneId: ZoneId,
+ locale: Locale = defaultLocale,
+ legacyFormat: LegacyDateFormat = LENIENT_SIMPLE_DATE_FORMAT):
DateFormatter = {
val pattern = format.getOrElse(defaultPattern)
- if (SQLConf.get.legacyTimeParserEnabled) {
- legacyFormat match {
- case FAST_DATE_FORMAT =>
- new LegacyFastDateFormatter(pattern, locale)
- case SIMPLE_DATE_FORMAT | LENIENT_SIMPLE_DATE_FORMAT =>
- new LegacySimpleDateFormatter(pattern, locale)
- }
+ if (SQLConf.get.legacyTimeParserPolicy == LEGACY) {
+ getLegacyFormatter(pattern, zoneId, locale, legacyFormat)
} else {
- new Iso8601DateFormatter(pattern, zoneId, locale)
+ new Iso8601DateFormatter(pattern, zoneId, locale, legacyFormat)
+ }
+ }
+
+ def getLegacyFormatter(
+ pattern: String,
+ zoneId: ZoneId,
+ locale: Locale,
+ legacyFormat: LegacyDateFormat): DateFormatter = {
+ legacyFormat match {
+ case FAST_DATE_FORMAT =>
+ new LegacyFastDateFormatter(pattern, locale)
+ case SIMPLE_DATE_FORMAT | LENIENT_SIMPLE_DATE_FORMAT =>
+ new LegacySimpleDateFormatter(pattern, locale)
}
}
def apply(
- format: String,
- zoneId: ZoneId,
- locale: Locale,
- legacyFormat: LegacyDateFormat): DateFormatter = {
+ format: String,
+ zoneId: ZoneId,
+ locale: Locale,
+ legacyFormat: LegacyDateFormat): DateFormatter = {
getFormatter(Some(format), zoneId, locale, legacyFormat)
}
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeFormatterHelper.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeFormatterHelper.scala
index a7b6309..33aa733 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeFormatterHelper.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeFormatterHelper.scala
@@ -19,13 +19,16 @@ package org.apache.spark.sql.catalyst.util
import java.time._
import java.time.chrono.IsoChronology
-import java.time.format.{DateTimeFormatter, DateTimeFormatterBuilder,
ResolverStyle}
+import java.time.format.{DateTimeFormatter, DateTimeFormatterBuilder,
DateTimeParseException, ResolverStyle}
import java.time.temporal.{ChronoField, TemporalAccessor, TemporalQueries}
import java.util.Locale
import com.google.common.cache.CacheBuilder
+import org.apache.spark.SparkUpgradeException
import org.apache.spark.sql.catalyst.util.DateTimeFormatterHelper._
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy._
trait DateTimeFormatterHelper {
// Converts the parsed temporal object to ZonedDateTime. It sets time
components to zeros
@@ -57,6 +60,27 @@ trait DateTimeFormatterHelper {
}
formatter
}
+
+ // When legacy time parser policy set to EXCEPTION, check whether we will
get different results
+ // between legacy parser and new parser. If new parser fails but legacy
parser works, throw a
+ // SparkUpgradeException. On the contrary, if the legacy policy set to
CORRECTED,
+ // DateTimeParseException will address by the caller side.
+ protected def checkDiffResult[T](
+ s: String, legacyParseFunc: String => T): PartialFunction[Throwable, T]
= {
+ case e: DateTimeParseException if SQLConf.get.legacyTimeParserPolicy ==
EXCEPTION =>
+ val res = try {
+ Some(legacyParseFunc(s))
+ } catch {
+ case _: Throwable => None
+ }
+ if (res.nonEmpty) {
+ throw new SparkUpgradeException("3.0", s"Fail to parse '$s' in the new
parser. You can " +
+ s"set ${SQLConf.LEGACY_TIME_PARSER_POLICY.key} to LEGACY to restore
the behavior " +
+ s"before Spark 3.0, or set to CORRECTED and treat it as an invalid
datetime string.", e)
+ } else {
+ throw e
+ }
+ }
}
private object DateTimeFormatterHelper {
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala
index b70a4ed..5c1a161 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala
@@ -29,7 +29,9 @@ import org.apache.commons.lang3.time.FastDateFormat
import org.apache.spark.sql.catalyst.util.DateTimeConstants._
import org.apache.spark.sql.catalyst.util.DateTimeUtils._
+import org.apache.spark.sql.catalyst.util.LegacyDateFormats.{LegacyDateFormat,
LENIENT_SIMPLE_DATE_FORMAT}
import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy._
import org.apache.spark.sql.types.Decimal
sealed trait TimestampFormatter extends Serializable {
@@ -52,21 +54,29 @@ sealed trait TimestampFormatter extends Serializable {
class Iso8601TimestampFormatter(
pattern: String,
zoneId: ZoneId,
- locale: Locale) extends TimestampFormatter with DateTimeFormatterHelper {
+ locale: Locale,
+ legacyFormat: LegacyDateFormat = LENIENT_SIMPLE_DATE_FORMAT)
+ extends TimestampFormatter with DateTimeFormatterHelper {
@transient
protected lazy val formatter = getOrCreateFormatter(pattern, locale)
+ @transient
+ protected lazy val legacyFormatter = TimestampFormatter.getLegacyFormatter(
+ pattern, zoneId, locale, legacyFormat)
+
override def parse(s: String): Long = {
val specialDate = convertSpecialTimestamp(s.trim, zoneId)
specialDate.getOrElse {
- val parsed = formatter.parse(s)
- val parsedZoneId = parsed.query(TemporalQueries.zone())
- val timeZoneId = if (parsedZoneId == null) zoneId else parsedZoneId
- val zonedDateTime = toZonedDateTime(parsed, timeZoneId)
- val epochSeconds = zonedDateTime.toEpochSecond
- val microsOfSecond = zonedDateTime.get(MICRO_OF_SECOND)
-
- Math.addExact(SECONDS.toMicros(epochSeconds), microsOfSecond)
+ try {
+ val parsed = formatter.parse(s)
+ val parsedZoneId = parsed.query(TemporalQueries.zone())
+ val timeZoneId = if (parsedZoneId == null) zoneId else parsedZoneId
+ val zonedDateTime = toZonedDateTime(parsed, timeZoneId)
+ val epochSeconds = zonedDateTime.toEpochSecond
+ val microsOfSecond = zonedDateTime.get(MICRO_OF_SECOND)
+
+ Math.addExact(SECONDS.toMicros(epochSeconds), microsOfSecond)
+ } catch checkDiffResult(s, legacyFormatter.parse)
}
}
@@ -186,31 +196,38 @@ object TimestampFormatter {
def defaultPattern(): String = s"${DateFormatter.defaultPattern()} HH:mm:ss"
private def getFormatter(
- format: Option[String],
- zoneId: ZoneId,
- locale: Locale = defaultLocale,
- legacyFormat: LegacyDateFormat = LENIENT_SIMPLE_DATE_FORMAT):
TimestampFormatter = {
-
+ format: Option[String],
+ zoneId: ZoneId,
+ locale: Locale = defaultLocale,
+ legacyFormat: LegacyDateFormat = LENIENT_SIMPLE_DATE_FORMAT):
TimestampFormatter = {
val pattern = format.getOrElse(defaultPattern)
- if (SQLConf.get.legacyTimeParserEnabled) {
- legacyFormat match {
- case FAST_DATE_FORMAT =>
- new LegacyFastTimestampFormatter(pattern, zoneId, locale)
- case SIMPLE_DATE_FORMAT =>
- new LegacySimpleTimestampFormatter(pattern, zoneId, locale, lenient
= false)
- case LENIENT_SIMPLE_DATE_FORMAT =>
- new LegacySimpleTimestampFormatter(pattern, zoneId, locale, lenient
= true)
- }
+ if (SQLConf.get.legacyTimeParserPolicy == LEGACY) {
+ getLegacyFormatter(pattern, zoneId, locale, legacyFormat)
} else {
- new Iso8601TimestampFormatter(pattern, zoneId, locale)
+ new Iso8601TimestampFormatter(pattern, zoneId, locale, legacyFormat)
+ }
+ }
+
+ def getLegacyFormatter(
+ pattern: String,
+ zoneId: ZoneId,
+ locale: Locale,
+ legacyFormat: LegacyDateFormat): TimestampFormatter = {
+ legacyFormat match {
+ case FAST_DATE_FORMAT =>
+ new LegacyFastTimestampFormatter(pattern, zoneId, locale)
+ case SIMPLE_DATE_FORMAT =>
+ new LegacySimpleTimestampFormatter(pattern, zoneId, locale, lenient =
false)
+ case LENIENT_SIMPLE_DATE_FORMAT =>
+ new LegacySimpleTimestampFormatter(pattern, zoneId, locale, lenient =
true)
}
}
def apply(
- format: String,
- zoneId: ZoneId,
- locale: Locale,
- legacyFormat: LegacyDateFormat): TimestampFormatter = {
+ format: String,
+ zoneId: ZoneId,
+ locale: Locale,
+ legacyFormat: LegacyDateFormat): TimestampFormatter = {
getFormatter(Some(format), zoneId, locale, legacyFormat)
}
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 7f55f22..2d17fb9 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -2352,6 +2352,18 @@ object SQLConf {
.checkValues(LegacyBehaviorPolicy.values.map(_.toString))
.createWithDefault(LegacyBehaviorPolicy.EXCEPTION.toString)
+ val LEGACY_TIME_PARSER_POLICY =
buildConf("spark.sql.legacy.timeParserPolicy")
+ .internal()
+ .doc("When LEGACY, java.text.SimpleDateFormat is used for formatting and
parsing " +
+ "dates/timestamps in a locale-sensitive manner, which is the approach
before Spark 3.0. " +
+ "When set to CORRECTED, classes from java.time.* packages are used for
the same purpose. " +
+ "The default value is EXCEPTION, RuntimeException is thrown when we will
get different " +
+ "results.")
+ .stringConf
+ .transform(_.toUpperCase(Locale.ROOT))
+ .checkValues(LegacyBehaviorPolicy.values.map(_.toString))
+ .createWithDefault(LegacyBehaviorPolicy.EXCEPTION.toString)
+
val LEGACY_ARRAY_EXISTS_FOLLOWS_THREE_VALUED_LOGIC =
buildConf("spark.sql.legacy.followThreeValuedLogicInArrayExists")
.internal()
@@ -2743,7 +2755,9 @@ class SQLConf extends Serializable with Logging {
def legacyMsSqlServerNumericMappingEnabled: Boolean =
getConf(LEGACY_MSSQLSERVER_NUMERIC_MAPPING_ENABLED)
- def legacyTimeParserEnabled: Boolean =
getConf(SQLConf.LEGACY_TIME_PARSER_ENABLED)
+ def legacyTimeParserPolicy: LegacyBehaviorPolicy.Value = {
+ LegacyBehaviorPolicy.withName(getConf(SQLConf.LEGACY_TIME_PARSER_POLICY))
+ }
/**
* Returns the [[Resolver]] for the current configuration, which can be used
to determine if two
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/DateExpressionsSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/DateExpressionsSuite.scala
index e43eb59..7fced04 100644
---
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/DateExpressionsSuite.scala
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/DateExpressionsSuite.scala
@@ -23,7 +23,7 @@ import java.time.{Instant, LocalDate, LocalDateTime, ZoneId,
ZoneOffset}
import java.util.{Calendar, Locale, TimeZone}
import java.util.concurrent.TimeUnit._
-import org.apache.spark.SparkFunSuite
+import org.apache.spark.{SparkFunSuite, SparkUpgradeException}
import org.apache.spark.sql.catalyst.InternalRow
import
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
import org.apache.spark.sql.catalyst.util.{DateTimeUtils, IntervalUtils,
TimestampFormatter}
@@ -241,8 +241,8 @@ class DateExpressionsSuite extends SparkFunSuite with
ExpressionEvalHelper {
}
test("DateFormat") {
- Seq(false, true).foreach { legacyParser =>
- withSQLConf(SQLConf.LEGACY_TIME_PARSER_ENABLED.key ->
legacyParser.toString) {
+ Seq("legacy", "corrected").foreach { legacyParserPolicy =>
+ withSQLConf(SQLConf.LEGACY_TIME_PARSER_POLICY.key -> legacyParserPolicy)
{
checkEvaluation(
DateFormatClass(Literal.create(null, TimestampType), Literal("y"),
gmtId),
null)
@@ -710,8 +710,8 @@ class DateExpressionsSuite extends SparkFunSuite with
ExpressionEvalHelper {
}
test("from_unixtime") {
- Seq(false, true).foreach { legacyParser =>
- withSQLConf(SQLConf.LEGACY_TIME_PARSER_ENABLED.key ->
legacyParser.toString) {
+ Seq("legacy", "corrected").foreach { legacyParserPolicy =>
+ withSQLConf(SQLConf.LEGACY_TIME_PARSER_POLICY.key -> legacyParserPolicy)
{
val fmt1 = "yyyy-MM-dd HH:mm:ss"
val sdf1 = new SimpleDateFormat(fmt1, Locale.US)
val fmt2 = "yyyy-MM-dd HH:mm:ss.SSS"
@@ -758,8 +758,8 @@ class DateExpressionsSuite extends SparkFunSuite with
ExpressionEvalHelper {
}
test("unix_timestamp") {
- Seq(false, true).foreach { legacyParser =>
- withSQLConf(SQLConf.LEGACY_TIME_PARSER_ENABLED.key ->
legacyParser.toString) {
+ Seq("legacy", "corrected").foreach { legacyParserPolicy =>
+ withSQLConf(SQLConf.LEGACY_TIME_PARSER_POLICY.key -> legacyParserPolicy)
{
val sdf1 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss", Locale.US)
val fmt2 = "yyyy-MM-dd HH:mm:ss.SSS"
val sdf2 = new SimpleDateFormat(fmt2, Locale.US)
@@ -824,8 +824,8 @@ class DateExpressionsSuite extends SparkFunSuite with
ExpressionEvalHelper {
}
test("to_unix_timestamp") {
- Seq(false, true).foreach { legacyParser =>
- withSQLConf(SQLConf.LEGACY_TIME_PARSER_ENABLED.key ->
legacyParser.toString) {
+ Seq("legacy", "corrected").foreach { legacyParserPolicy =>
+ withSQLConf(SQLConf.LEGACY_TIME_PARSER_POLICY.key -> legacyParserPolicy)
{
val fmt1 = "yyyy-MM-dd HH:mm:ss"
val sdf1 = new SimpleDateFormat(fmt1, Locale.US)
val fmt2 = "yyyy-MM-dd HH:mm:ss.SSS"
@@ -1164,4 +1164,25 @@ class DateExpressionsSuite extends SparkFunSuite with
ExpressionEvalHelper {
Literal(LocalDate.of(1, 1, 1))),
IntervalUtils.stringToInterval(UTF8String.fromString("interval 9999
years")))
}
+
+ test("to_timestamp exception mode") {
+ withSQLConf(SQLConf.LEGACY_TIME_PARSER_POLICY.key -> "legacy") {
+ checkEvaluation(
+ GetTimestamp(
+ Literal("2020-01-27T20:06:11.847-0800"),
+ Literal("yyyy-MM-dd'T'HH:mm:ss.SSSz")), 1580184371847000L)
+ }
+ withSQLConf(SQLConf.LEGACY_TIME_PARSER_POLICY.key -> "corrected") {
+ checkEvaluation(
+ GetTimestamp(
+ Literal("2020-01-27T20:06:11.847-0800"),
+ Literal("yyyy-MM-dd'T'HH:mm:ss.SSSz")), null)
+ }
+ withSQLConf(SQLConf.LEGACY_TIME_PARSER_POLICY.key -> "exception") {
+ checkExceptionInExpression[SparkUpgradeException](
+ GetTimestamp(
+ Literal("2020-01-27T20:06:11.847-0800"),
+ Literal("yyyy-MM-dd'T'HH:mm:ss.SSSz")), "Fail to parse")
+ }
+ }
}
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/json/JsonInferSchemaSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/json/JsonInferSchemaSuite.scala
index c2e03bd..bce917c 100644
---
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/json/JsonInferSchemaSuite.scala
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/json/JsonInferSchemaSuite.scala
@@ -40,8 +40,8 @@ class JsonInferSchemaSuite extends SparkFunSuite with
SQLHelper {
}
test("inferring timestamp type") {
- Seq(true, false).foreach { legacyParser =>
- withSQLConf(SQLConf.LEGACY_TIME_PARSER_ENABLED.key ->
legacyParser.toString) {
+ Seq("legacy", "corrected").foreach { legacyParserPolicy =>
+ withSQLConf(SQLConf.LEGACY_TIME_PARSER_POLICY.key -> legacyParserPolicy)
{
checkTimestampType("yyyy", """{"a": "2018"}""")
checkTimestampType("yyyy=MM", """{"a": "2018=12"}""")
checkTimestampType("yyyy MM dd", """{"a": "2018 12 02"}""")
@@ -56,8 +56,8 @@ class JsonInferSchemaSuite extends SparkFunSuite with
SQLHelper {
}
test("prefer decimals over timestamps") {
- Seq(true, false).foreach { legacyParser =>
- withSQLConf(SQLConf.LEGACY_TIME_PARSER_ENABLED.key ->
legacyParser.toString) {
+ Seq("legacy", "corrected").foreach { legacyParser =>
+ withSQLConf(SQLConf.LEGACY_TIME_PARSER_POLICY.key -> legacyParser) {
checkType(
options = Map(
"prefersDecimal" -> "true",
@@ -71,8 +71,8 @@ class JsonInferSchemaSuite extends SparkFunSuite with
SQLHelper {
}
test("skip decimal type inferring") {
- Seq(true, false).foreach { legacyParser =>
- withSQLConf(SQLConf.LEGACY_TIME_PARSER_ENABLED.key ->
legacyParser.toString) {
+ Seq("legacy", "corrected").foreach { legacyParserPolicy =>
+ withSQLConf(SQLConf.LEGACY_TIME_PARSER_POLICY.key -> legacyParserPolicy)
{
checkType(
options = Map(
"prefersDecimal" -> "false",
@@ -86,8 +86,8 @@ class JsonInferSchemaSuite extends SparkFunSuite with
SQLHelper {
}
test("fallback to string type") {
- Seq(true, false).foreach { legacyParser =>
- withSQLConf(SQLConf.LEGACY_TIME_PARSER_ENABLED.key ->
legacyParser.toString) {
+ Seq("legacy", "corrected").foreach { legacyParserPolicy =>
+ withSQLConf(SQLConf.LEGACY_TIME_PARSER_POLICY.key -> legacyParserPolicy)
{
checkType(
options = Map("timestampFormat" -> "yyyy,MM,dd.HHmmssSSS"),
json = """{"a": "20181202.210400123"}""",
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala
index 61f0e13..89fb4d5 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala
@@ -59,10 +59,22 @@ class CsvFunctionsSuite extends QueryTest with
SharedSparkSession {
val df2 = df
.select(from_csv($"value", schemaWithCorrField1, Map(
"mode" -> "Permissive", "columnNameOfCorruptRecord" ->
columnNameOfCorruptRecord)))
-
- checkAnswer(df2, Seq(
- Row(Row(0, null, "0,2013-111-11 12:13:14")),
- Row(Row(1, java.sql.Date.valueOf("1983-08-04"), null))))
+ withSQLConf(SQLConf.LEGACY_TIME_PARSER_POLICY.key -> "corrected") {
+ checkAnswer(df2, Seq(
+ Row(Row(0, null, "0,2013-111-11 12:13:14")),
+ Row(Row(1, java.sql.Date.valueOf("1983-08-04"), null))))
+ }
+ withSQLConf(SQLConf.LEGACY_TIME_PARSER_POLICY.key -> "legacy") {
+ checkAnswer(df2, Seq(
+ Row(Row(0, java.sql.Date.valueOf("2022-03-11"), null)),
+ Row(Row(1, java.sql.Date.valueOf("1983-08-04"), null))))
+ }
+ withSQLConf(SQLConf.LEGACY_TIME_PARSER_POLICY.key -> "exception") {
+ val msg = intercept[SparkException] {
+ df2.collect()
+ }.getCause.getMessage
+ assert(msg.contains("Fail to parse"))
+ }
}
test("schema_of_csv - infers schemas") {
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala
index fd65f75..3865012 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala
@@ -23,7 +23,8 @@ import java.time.{Instant, LocalDateTime, ZoneId}
import java.util.{Locale, TimeZone}
import java.util.concurrent.TimeUnit
-import org.apache.spark.sql.catalyst.util.{DateTimeUtils, IntervalUtils}
+import org.apache.spark.SparkException
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSparkSession
@@ -96,8 +97,8 @@ class DateFunctionsSuite extends QueryTest with
SharedSparkSession {
}
test("date format") {
- Seq(false, true).foreach { legacyParser =>
- withSQLConf(SQLConf.LEGACY_TIME_PARSER_ENABLED.key ->
legacyParser.toString) {
+ Seq("legacy", "corrected").foreach { legacyParserPolicy =>
+ withSQLConf(SQLConf.LEGACY_TIME_PARSER_POLICY.key -> legacyParserPolicy)
{
val df = Seq((d, sdf.format(d), ts)).toDF("a", "b", "c")
checkAnswer(
@@ -377,6 +378,13 @@ class DateFunctionsSuite extends QueryTest with
SharedSparkSession {
Seq(Row(Date.valueOf("2015-07-30")), Row(Date.valueOf("2015-07-30"))))
}
+ def checkExceptionMessage(df: DataFrame): Unit = {
+ val message = intercept[SparkException] {
+ df.collect()
+ }.getCause.getMessage
+ assert(message.contains("Fail to parse"))
+ }
+
test("function to_date") {
val d1 = Date.valueOf("2015-07-22")
val d2 = Date.valueOf("2015-07-01")
@@ -422,9 +430,15 @@ class DateFunctionsSuite extends QueryTest with
SharedSparkSession {
df.select(to_date(col("d"), "yyyy-MM-dd")),
Seq(Row(Date.valueOf("2015-07-22")), Row(Date.valueOf("2015-07-01")),
Row(Date.valueOf("2014-12-31"))))
- checkAnswer(
- df.select(to_date(col("s"), "yyyy-MM-dd")),
- Seq(Row(null), Row(Date.valueOf("2014-12-31")), Row(null)))
+ val confKey = SQLConf.LEGACY_TIME_PARSER_POLICY.key
+ withSQLConf(confKey -> "corrected") {
+ checkAnswer(
+ df.select(to_date(col("s"), "yyyy-MM-dd")),
+ Seq(Row(null), Row(Date.valueOf("2014-12-31")), Row(null)))
+ }
+ withSQLConf(confKey -> "exception") {
+ checkExceptionMessage(df.select(to_date(col("s"), "yyyy-MM-dd")))
+ }
// now switch format
checkAnswer(
@@ -529,8 +543,8 @@ class DateFunctionsSuite extends QueryTest with
SharedSparkSession {
}
test("from_unixtime") {
- Seq(false, true).foreach { legacyParser =>
- withSQLConf(SQLConf.LEGACY_TIME_PARSER_ENABLED.key ->
legacyParser.toString) {
+ Seq("corrected", "legacy").foreach { legacyParserPolicy =>
+ withSQLConf(SQLConf.LEGACY_TIME_PARSER_POLICY.key -> legacyParserPolicy)
{
val sdf1 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss", Locale.US)
val fmt2 = "yyyy-MM-dd HH:mm:ss.SSS"
val sdf2 = new SimpleDateFormat(fmt2, Locale.US)
@@ -562,8 +576,8 @@ class DateFunctionsSuite extends QueryTest with
SharedSparkSession {
private def secs(millis: Long): Long =
TimeUnit.MILLISECONDS.toSeconds(millis)
test("unix_timestamp") {
- Seq(false, true).foreach { legacyParser =>
- withSQLConf(SQLConf.LEGACY_TIME_PARSER_ENABLED.key ->
legacyParser.toString) {
+ Seq("corrected", "legacy").foreach { legacyParserPolicy =>
+ withSQLConf(SQLConf.LEGACY_TIME_PARSER_POLICY.key -> legacyParserPolicy)
{
val date1 = Date.valueOf("2015-07-24")
val date2 = Date.valueOf("2015-07-25")
val ts1 = Timestamp.valueOf("2015-07-24 10:00:00.3")
@@ -629,8 +643,8 @@ class DateFunctionsSuite extends QueryTest with
SharedSparkSession {
}
test("to_unix_timestamp") {
- Seq(false, true).foreach { legacyParser =>
- withSQLConf(SQLConf.LEGACY_TIME_PARSER_ENABLED.key ->
legacyParser.toString) {
+ Seq("corrected", "legacy").foreach { legacyParserPolicy =>
+ withSQLConf(SQLConf.LEGACY_TIME_PARSER_POLICY.key -> legacyParserPolicy)
{
val date1 = Date.valueOf("2015-07-24")
val date2 = Date.valueOf("2015-07-25")
val ts1 = Timestamp.valueOf("2015-07-24 10:00:00.3")
@@ -680,8 +694,8 @@ class DateFunctionsSuite extends QueryTest with
SharedSparkSession {
test("to_timestamp") {
- Seq(false, true).foreach { legacyParser =>
- withSQLConf(SQLConf.LEGACY_TIME_PARSER_ENABLED.key ->
legacyParser.toString) {
+ Seq("legacy", "corrected").foreach { legacyParserPolicy =>
+ withSQLConf(SQLConf.LEGACY_TIME_PARSER_POLICY.key -> legacyParserPolicy)
{
val date1 = Date.valueOf("2015-07-24")
val date2 = Date.valueOf("2015-07-25")
val ts_date1 = Timestamp.valueOf("2015-07-24 00:00:00")
@@ -701,7 +715,7 @@ class DateFunctionsSuite extends QueryTest with
SharedSparkSession {
df.select(unix_timestamp(col("ss")).cast("timestamp")))
checkAnswer(df.select(to_timestamp(col("ss"))), Seq(
Row(ts1), Row(ts2)))
- if (legacyParser) {
+ if (legacyParserPolicy == "legacy") {
// In Spark 2.4 and earlier, to_timestamp() parses in seconds
precision and cuts off
// the fractional part of seconds. The behavior was changed by
SPARK-27438.
val legacyFmt = "yyyy/MM/dd HH:mm:ss"
@@ -819,16 +833,18 @@ class DateFunctionsSuite extends QueryTest with
SharedSparkSession {
}
test("SPARK-30668: use legacy timestamp parser in to_timestamp") {
- def checkTimeZoneParsing(expected: Any): Unit = {
- val df = Seq("2020-01-27T20:06:11.847-0800").toDF("ts")
+ val confKey = SQLConf.LEGACY_TIME_PARSER_POLICY.key
+ val df = Seq("2020-01-27T20:06:11.847-0800").toDF("ts")
+ withSQLConf(confKey -> "legacy") {
+ val expected = Timestamp.valueOf("2020-01-27 20:06:11.847")
checkAnswer(df.select(to_timestamp(col("ts"),
"yyyy-MM-dd'T'HH:mm:ss.SSSz")),
Row(expected))
}
- withSQLConf(SQLConf.LEGACY_TIME_PARSER_ENABLED.key -> "true") {
- checkTimeZoneParsing(Timestamp.valueOf("2020-01-27 20:06:11.847"))
+ withSQLConf(confKey -> "corrected") {
+ checkAnswer(df.select(to_timestamp(col("ts"),
"yyyy-MM-dd'T'HH:mm:ss.SSSz")), Row(null))
}
- withSQLConf(SQLConf.LEGACY_TIME_PARSER_ENABLED.key -> "false") {
- checkTimeZoneParsing(null)
+ withSQLConf(confKey -> "exception") {
+ checkExceptionMessage(df.select(to_timestamp(col("ts"),
"yyyy-MM-dd'T'HH:mm:ss.SSSz")))
}
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
index 43553df..30ae9dc 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
@@ -2307,6 +2307,26 @@ abstract class CSVSuite extends QueryTest with
SharedSparkSession with TestCsvDa
val csv = spark.read.option("header", false).schema("t timestamp, d
date").csv(ds)
checkAnswer(csv, Row(Timestamp.valueOf("2020-1-12 3:23:34.12"),
Date.valueOf("2020-1-12")))
}
+
+ test("exception mode for parsing date/timestamp string") {
+ val ds = Seq("2020-01-27T20:06:11.847-0800").toDS()
+ val csv = spark.read
+ .option("header", false)
+ .option("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss.SSSz")
+ .schema("t timestamp").csv(ds)
+ withSQLConf(SQLConf.LEGACY_TIME_PARSER_POLICY.key -> "exception") {
+ val msg = intercept[SparkException] {
+ csv.collect()
+ }.getCause.getMessage
+ assert(msg.contains("Fail to parse"))
+ }
+ withSQLConf(SQLConf.LEGACY_TIME_PARSER_POLICY.key -> "legacy") {
+ checkAnswer(csv, Row(Timestamp.valueOf("2020-01-27 20:06:11.847")))
+ }
+ withSQLConf(SQLConf.LEGACY_TIME_PARSER_POLICY.key -> "corrected") {
+ checkAnswer(csv, Row(null))
+ }
+ }
}
class CSVv1Suite extends CSVSuite {
@@ -2327,5 +2347,5 @@ class CSVLegacyTimeParserSuite extends CSVSuite {
override protected def sparkConf: SparkConf =
super
.sparkConf
- .set(SQLConf.LEGACY_TIME_PARSER_ENABLED, true)
+ .set(SQLConf.LEGACY_TIME_PARSER_POLICY, "legacy")
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
index 26d600e..917da5e 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
@@ -2669,6 +2669,26 @@ abstract class JsonSuite extends QueryTest with
SharedSparkSession with TestJson
Date.valueOf("2020-1-12"),
Date.valueOf(LocalDate.ofEpochDay(12345))))
}
+
+ test("exception mode for parsing date/timestamp string") {
+ val ds = Seq("{'t': '2020-01-27T20:06:11.847-0800'}").toDS()
+ val json = spark.read
+ .schema("t timestamp")
+ .option("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss.SSSz")
+ .json(ds)
+ withSQLConf(SQLConf.LEGACY_TIME_PARSER_POLICY.key -> "exception") {
+ val msg = intercept[SparkException] {
+ json.collect()
+ }.getCause.getMessage
+ assert(msg.contains("Fail to parse"))
+ }
+ withSQLConf(SQLConf.LEGACY_TIME_PARSER_POLICY.key -> "legacy") {
+ checkAnswer(json, Row(Timestamp.valueOf("2020-01-27 20:06:11.847")))
+ }
+ withSQLConf(SQLConf.LEGACY_TIME_PARSER_POLICY.key -> "corrected") {
+ checkAnswer(json, Row(null))
+ }
+ }
}
class JsonV1Suite extends JsonSuite {
@@ -2689,5 +2709,5 @@ class JsonLegacyTimeParserSuite extends JsonSuite {
override protected def sparkConf: SparkConf =
super
.sparkConf
- .set(SQLConf.LEGACY_TIME_PARSER_ENABLED, true)
+ .set(SQLConf.LEGACY_TIME_PARSER_POLICY, "legacy")
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]