This is an automated email from the ASF dual-hosted git repository.
MaxGekk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new e3587a414caa [SPARK-57033][SQL] Add java.time LocalDateTime/Instant
conversion and Dataset roundtrip for nanosecond timestamps
e3587a414caa is described below
commit e3587a414caa24900e14ac72f550ab5249817c7a
Author: Maxim Gekk <[email protected]>
AuthorDate: Sun May 31 19:28:14 2026 +0200
[SPARK-57033][SQL] Add java.time LocalDateTime/Instant conversion and
Dataset roundtrip for nanosecond timestamps
### What changes were proposed in this pull request?
This PR wires `java.time.LocalDateTime` / `java.time.Instant` through the
encoder, converter and Dataset stack for the nanosecond-capable timestamp types
`TimestampNTZNanosType(p)` and `TimestampLTZNanosType(p)` introduced by
SPARK-56981, so that `spark.createDataFrame(rows, schema).collect()` over a
schema declaring a nanos timestamp column preserves full nanosecond precision
end-to-end.
Concretely:
- Four new `DateTimeUtils` helpers: `localDateTimeToTimestampNanos`,
`timestampNanosToLocalDateTime`, `instantToTimestampNanos`,
`timestampNanosToInstant`. The integral micro part reuses `instantToMicros` /
`localDateTimeToMicros` (and its `MIN_SECONDS` guard); the sub-micro nanos are
kept as `nanosWithinMicro` in `[0, 999]`.
- Two new precision-aware `AgnosticEncoder` leaves:
`LocalDateTimeNanosEncoder(precision)` and `InstantNanosEncoder(precision)`,
with `RowEncoder.encoderForDataType` mapping the nanos data types to them.
- New `CatalystTypeConverters`, `SerializerBuildHelper` and
`DeserializerBuildHelper` entries dispatching on the new encoders / data types.
`TimestampLTZNanosType` always maps to `Instant`, independent of
`spark.sql.datetime.java8API.enabled`.
- `EncoderUtils.dataTypeJavaClassDefault` / `javaBoxedType` learn the new
logical types so `StaticInvoke` codegen picks up `TimestampNanosVal` as the
column's Java class.
### Why are the changes needed?
`SPARK-56981` added physical row storage for nanosecond timestamps, but
there was no conversion layer from the external Java types. As a result, even
with the nanos schema explicitly declared, Dataset create -> internal row ->
collect roundtrips silently truncated to micros:
```scala
val ldt = java.time.LocalDateTime.parse("2019-02-26T16:56:00.123456789")
val schema = new StructType().add("t", TimestampNTZNanosType(9))
spark.createDataFrame(java.util.Arrays.asList(Row(ldt)),
schema).collect()(0).get(0)
// before this PR: 2019-02-26T16:56:00.123456 (last 3 digits dropped)
// after this PR: 2019-02-26T16:56:00.123456789 (preserved)
```
This change is required for the rest of the nanos-timestamp SPIP
(SPARK-56822) to be useful from the DataFrame / Dataset API.
### Does this PR introduce _any_ user-facing change?
Yes, but only for the new `TimestampNTZNanosType` / `TimestampLTZNanosType`
(preview feature, gated by `spark.sql.timestampNanosTypes.enabled`):
- A `Dataset[Row]` whose schema declares a nanos timestamp column now
accepts `LocalDateTime` (NTZ Nanos) / `Instant` (LTZ Nanos) and returns them on
`collect` with full nanosecond precision.
- A bare `LocalDateTime` or `Instant` value (without an explicit schema)
still resolves to the existing micro `TimestampNTZType` / `TimestampType`
encoders, so `Dataset[LocalDateTime]` / `Dataset[Instant]` behavior is
unchanged.
Existing micro timestamp behavior is unchanged.
### How was this patch tested?
New unit tests, all passing locally:
- `DateTimeUtilsSuite` - three `SPARK-57033` tests for the helpers (LDT,
Instant, randomized roundtrip across the full valid range, including pre-epoch,
epoch, max range, sub-micro digits, and the `nanosWithinMicro in [0, 999]`
invariant).
- `CatalystTypeConvertersSuite` - both directions for
`TimestampNTZNanosType` / `TimestampLTZNanosType` across precisions 7-9, a
flag-independence test for the LTZ Nanos path, and a null-row roundtrip via
`Row` + schema.
- `RowEncoderSuite` - encode/decode tests for both nanos types across all
precisions and both codegen / interpreted paths, plus a Java8 flag-independence
test.
- `DatasetSuite` - end-to-end `spark.createDataFrame(rows,
schema).collect()` with sub-micro fractional digits, a max-range edge value,
and a null row.
- `JavaDatasetSuite` - `testTimestampNanosRowEncoder` exercising
`spark.createDataset(rows, Encoders.row(schema))` from Java.
Regression check across adjacent suites (no failures):
- `build/sbt 'catalyst/testOnly *DateTimeUtilsSuite
*CatalystTypeConvertersSuite *RowEncoderSuite'` - 233/233 pass.
- `build/sbt 'catalyst/testOnly *ExpressionEncoderSuite
*TimestampNanosRowSuite *LiteralExpressionSuite'` - 453/453 pass.
- `build/sbt 'sql/testOnly *DatasetSuite'` - 276/276 pass (221 Scala + 55
Java).
- `./dev/scalastyle` and `./dev/lint-java` clean.
### Was this patch authored or co-authored using generative AI tooling?
Generated-by: Cursor 2.0 (Claude Opus 4.7)
Closes #56158 from MaxGekk/nanos-java-types.
Authored-by: Maxim Gekk <[email protected]>
Signed-off-by: Max Gekk <[email protected]>
---
.../src/main/resources/error/error-conditions.json | 11 +-
docs/sql-ref-datatypes.md | 2 +-
.../sql/catalyst/encoders/AgnosticEncoder.scala | 8 +
.../spark/sql/catalyst/encoders/RowEncoder.scala | 14 +-
.../sql/catalyst/util/SparkDateTimeUtils.scala | 78 ++++++-
.../sql/catalyst/CatalystTypeConverters.scala | 67 +++++-
.../sql/catalyst/DeserializerBuildHelper.scala | 24 +-
.../spark/sql/catalyst/SerializerBuildHelper.scala | 26 ++-
.../spark/sql/catalyst/encoders/EncoderUtils.scala | 8 +-
.../sql/catalyst/CatalystTypeConvertersSuite.scala | 250 ++++++++++++++++++++-
.../sql/catalyst/encoders/RowEncoderSuite.scala | 72 +++++-
.../sql/catalyst/util/DateTimeUtilsSuite.scala | 148 +++++++++++-
.../org/apache/spark/sql/JavaDatasetSuite.java | 18 ++
.../scala/org/apache/spark/sql/DatasetSuite.scala | 44 ++++
14 files changed, 740 insertions(+), 30 deletions(-)
diff --git a/common/utils/src/main/resources/error/error-conditions.json
b/common/utils/src/main/resources/error/error-conditions.json
index 91976d21934d..a5e539199b75 100644
--- a/common/utils/src/main/resources/error/error-conditions.json
+++ b/common/utils/src/main/resources/error/error-conditions.json
@@ -3757,6 +3757,12 @@
],
"sqlState" : "42K0N"
},
+ "INVALID_EXTERNAL_VALUE" : {
+ "message" : [
+ "The value (<other>) of the type (<otherClass>) cannot be converted to
the <dataType> type."
+ ],
+ "sqlState" : "42K0N"
+ },
"INVALID_EXTRACT_BASE_FIELD_TYPE" : {
"message" : [
"Can't extract a value from <base>. Need a complex type [STRUCT, ARRAY,
MAP] but got <other>."
@@ -11526,11 +11532,6 @@
"Must be 2 children: <others>"
]
},
- "_LEGACY_ERROR_TEMP_3219" : {
- "message" : [
- "The value (<other>) of the type (<otherClass>) cannot be converted to
the <dataType> type."
- ]
- },
"_LEGACY_ERROR_TEMP_3220" : {
"message" : [
"The value (<other>) of the type (<otherClass>) cannot be converted to
an array of <elementType>"
diff --git a/docs/sql-ref-datatypes.md b/docs/sql-ref-datatypes.md
index fe1b8724d8d6..e0d36631dec0 100644
--- a/docs/sql-ref-datatypes.md
+++ b/docs/sql-ref-datatypes.md
@@ -54,7 +54,7 @@ Spark SQL and DataFrames support the following data types:
- `TimestampNTZType`: Timestamp without time zone(TIMESTAMP_NTZ). It
represents values comprising values of fields year, month, day,
hour, minute, and second. All operations are performed without taking any
time zone into account.
- Note: TIMESTAMP in Spark is a user-specified alias associated with one
of the TIMESTAMP_LTZ and TIMESTAMP_NTZ variations. Users can set the default
timestamp type as `TIMESTAMP_LTZ`(default value) or `TIMESTAMP_NTZ` via the
configuration `spark.sql.timestampType`.
- - `TimestampNTZNanosType(precision)` / `TimestampLTZNanosType(precision)`:
Preview nanosecond-capable variants of `TIMESTAMP_NTZ` and `TIMESTAMP_LTZ` with
fractional seconds precision `precision` in `[7, 9]`. Unparameterized
`TIMESTAMP`, `TIMESTAMP_NTZ`, and `TIMESTAMP_LTZ` remain microsecond types.
Enable the preview feature with `SET
spark.sql.timestampNanosTypes.enabled=true;` before using these types in
schemas or SQL.
+ - `TimestampNTZNanosType(precision)` / `TimestampLTZNanosType(precision)`:
Preview nanosecond-capable variants of `TIMESTAMP_NTZ` and `TIMESTAMP_LTZ` with
fractional seconds precision `precision` in `[7, 9]`. Unparameterized
`TIMESTAMP`, `TIMESTAMP_NTZ`, and `TIMESTAMP_LTZ` remain microsecond types. In
schema-driven Dataset/DataFrame conversion, Spark maps `TimestampNTZNanosType`
to `java.time.LocalDateTime` and `TimestampLTZNanosType` to
`java.time.Instant`; values with more sub-micro [...]
* Interval types
- `YearMonthIntervalType(startField, endField)`: Represents a year-month
interval which is made up of a contiguous subset of the following fields:
diff --git
a/sql/api/src/main/scala/org/apache/spark/sql/catalyst/encoders/AgnosticEncoder.scala
b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/encoders/AgnosticEncoder.scala
index 20949c188cb8..57c15de4f0db 100644
---
a/sql/api/src/main/scala/org/apache/spark/sql/catalyst/encoders/AgnosticEncoder.scala
+++
b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/encoders/AgnosticEncoder.scala
@@ -257,6 +257,14 @@ object AgnosticEncoders {
case class InstantEncoder(override val lenientSerialization: Boolean)
extends LeafEncoder[Instant](TimestampType)
case object LocalDateTimeEncoder extends
LeafEncoder[LocalDateTime](TimestampNTZType)
+ // Nanosecond-precision counterparts of `LocalDateTimeEncoder` /
`InstantEncoder(false)`.
+ // They are used by `RowEncoder` when the schema declares a
`TimestampNTZNanosType(p)` or
+ // `TimestampLTZNanosType(p)` column, so Dataset create/collect roundtrips
preserve full
+ // nanosecond precision. See SPARK-57033.
+ case class LocalDateTimeNanosEncoder(precision: Int)
+ extends LeafEncoder[LocalDateTime](TimestampNTZNanosType(precision))
+ case class InstantNanosEncoder(precision: Int)
+ extends LeafEncoder[Instant](TimestampLTZNanosType(precision))
case object LocalTimeEncoder extends LeafEncoder[LocalTime](TimeType())
case class SparkDecimalEncoder(dt: DecimalType) extends
LeafEncoder[Decimal](dt)
diff --git
a/sql/api/src/main/scala/org/apache/spark/sql/catalyst/encoders/RowEncoder.scala
b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/encoders/RowEncoder.scala
index bad673672188..705d5d8f11b1 100644
---
a/sql/api/src/main/scala/org/apache/spark/sql/catalyst/encoders/RowEncoder.scala
+++
b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/encoders/RowEncoder.scala
@@ -21,8 +21,8 @@ import scala.collection.mutable
import scala.reflect.classTag
import org.apache.spark.sql.{AnalysisException, Row}
-import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.{BinaryEncoder,
BoxedBooleanEncoder, BoxedByteEncoder, BoxedDoubleEncoder, BoxedFloatEncoder,
BoxedIntEncoder, BoxedLongEncoder, BoxedShortEncoder, CalendarIntervalEncoder,
CharEncoder, DateEncoder, DayTimeIntervalEncoder, EncoderField,
GeographyEncoder, GeometryEncoder, InstantEncoder, IterableEncoder,
JavaDecimalEncoder, LocalDateEncoder, LocalDateTimeEncoder, LocalTimeEncoder,
MapEncoder, NullEncoder, RowEncoder => Agnosti [...]
-import org.apache.spark.sql.errors.DataTypeErrorsBase
+import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.{BinaryEncoder,
BoxedBooleanEncoder, BoxedByteEncoder, BoxedDoubleEncoder, BoxedFloatEncoder,
BoxedIntEncoder, BoxedLongEncoder, BoxedShortEncoder, CalendarIntervalEncoder,
CharEncoder, DateEncoder, DayTimeIntervalEncoder, EncoderField,
GeographyEncoder, GeometryEncoder, InstantEncoder, InstantNanosEncoder,
IterableEncoder, JavaDecimalEncoder, LocalDateEncoder, LocalDateTimeEncoder,
LocalDateTimeNanosEncoder, LocalTimeEncoder [...]
+import org.apache.spark.sql.errors.{DataTypeErrors, DataTypeErrorsBase}
import org.apache.spark.sql.internal.SqlApiConf
import org.apache.spark.sql.types._
import org.apache.spark.sql.types.ops.TypeApiOps
@@ -50,6 +50,8 @@ import org.apache.spark.util.ArrayImplicits._
* TimestampType -> java.time.Instant if spark.sql.datetime.java8API.enabled
is true
*
* TimestampNTZType -> java.time.LocalDateTime
+ * TimestampNTZNanosType -> java.time.LocalDateTime
+ * TimestampLTZNanosType -> java.time.Instant
* TimeType -> java.time.LocalTime
*
* DayTimeIntervalType -> java.time.Duration
@@ -97,6 +99,14 @@ object RowEncoder extends DataTypeErrorsBase {
case TimestampType if SqlApiConf.get.datetimeJava8ApiEnabled =>
InstantEncoder(lenient)
case TimestampType => TimestampEncoder(lenient)
case TimestampNTZType => LocalDateTimeEncoder
+ // Nano timestamp types intentionally do not honor `lenient`: legacy
`java.sql.Timestamp` /
+ // `java.sql.Date` external types are out of scope for nanosecond
precision (SPARK-57033).
+ case t: TimestampNTZNanosType =>
+ DataTypeErrors.checkTimestampNanosTypesEnabled()
+ LocalDateTimeNanosEncoder(t.precision)
+ case t: TimestampLTZNanosType =>
+ DataTypeErrors.checkTimestampNanosTypesEnabled()
+ InstantNanosEncoder(t.precision)
case DateType if SqlApiConf.get.datetimeJava8ApiEnabled =>
LocalDateEncoder(lenient)
case DateType => DateEncoder(lenient)
case _: TimeType => LocalTimeEncoder
diff --git
a/sql/api/src/main/scala/org/apache/spark/sql/catalyst/util/SparkDateTimeUtils.scala
b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/util/SparkDateTimeUtils.scala
index 9684737a2286..597a96c548ce 100644
---
a/sql/api/src/main/scala/org/apache/spark/sql/catalyst/util/SparkDateTimeUtils.scala
+++
b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/util/SparkDateTimeUtils.scala
@@ -31,7 +31,7 @@ import org.apache.spark.sql.catalyst.util.DateTimeConstants._
import
org.apache.spark.sql.catalyst.util.RebaseDateTime.{rebaseGregorianToJulianDays,
rebaseGregorianToJulianMicros, rebaseJulianToGregorianDays,
rebaseJulianToGregorianMicros}
import org.apache.spark.sql.errors.ExecutionErrors
import org.apache.spark.sql.types.{DateType, TimestampType, TimeType}
-import org.apache.spark.unsafe.types.UTF8String
+import org.apache.spark.unsafe.types.{TimestampNanosVal, UTF8String}
import org.apache.spark.util.SparkClassUtils
trait SparkDateTimeUtils {
@@ -208,6 +208,82 @@ trait SparkDateTimeUtils {
instantToMicros(localDateTime.toInstant(ZoneOffset.UTC))
}
+ /**
+ * Truncates the sub-microsecond nanosecond part to the given timestamp
precision `p` in [7, 9].
+ * Precision 9 keeps all three digits, 8 zeros the last digit, 7 zeros the
last two.
+ *
+ * The input is the already-extracted `nanosWithinMicro` component
(`0..999`), so truncation is
+ * independent of the epoch sign of the original timestamp value.
+ *
+ * Precisions outside `[7, 9]` are passed through unchanged because the
surrounding timestamp
+ * nanos types validate the bound.
+ */
+ private def truncateNanosWithinMicroToPrecision(nanosWithinMicro: Int,
precision: Int): Int = {
+ precision match {
+ case 7 => (nanosWithinMicro / 100) * 100
+ case 8 => (nanosWithinMicro / 10) * 10
+ case _ => nanosWithinMicro
+ }
+ }
+
+ /**
+ * Converts a `java.time.LocalDateTime` into the composite `(epochMicros,
nanosWithinMicro)`
+ * pair used by `TimestampNTZNanosType(precision)` (interpreted at UTC).
`epochMicros` comes
+ * from [[localDateTimeToMicros]] (which is floor toward `-inf` for the
integral micro part);
+ * the last three decimal digits of `localDateTime.getNano` (`[0, 999]`)
become
+ * `nanosWithinMicro` after dropping `(9 - precision)` low digits.
+ *
+ * Combined, the result is the floor toward `-inf` of the original
nanosecond value rounded down
+ * to the precision step (10^(9 - precision) ns). At `precision = 9` the
conversion is lossless
+ * within the valid range; at 7 / 8 the lowest 2 / 1 sub-micro digits are
dropped. The same
+ * flooring will be the basis of the future `CAST(... AS
TIMESTAMP_NTZ(precision))` rule.
+ */
+ def localDateTimeToTimestampNanos(
+ localDateTime: LocalDateTime,
+ precision: Int): TimestampNanosVal = {
+ val epochMicros = localDateTimeToMicros(localDateTime)
+ val rawNanosWithinMicro = localDateTime.getNano % NANOS_PER_MICROS.toInt
+ val nanosWithinMicro =
truncateNanosWithinMicroToPrecision(rawNanosWithinMicro, precision)
+ TimestampNanosVal.fromParts(epochMicros, nanosWithinMicro.toShort)
+ }
+
+ /**
+ * Reverse of [[localDateTimeToTimestampNanos]]: rebuilds a
`java.time.LocalDateTime` (at UTC)
+ * from a `TimestampNanosVal`. `nanosWithinMicro` is in `[0, 999]` so
`plusNanos` never crosses
+ * the second boundary.
+ */
+ def timestampNanosToLocalDateTime(v: TimestampNanosVal): LocalDateTime = {
+ microsToLocalDateTime(v.epochMicros).plusNanos(v.nanosWithinMicro.toLong)
+ }
+
+ /**
+ * Converts a `java.time.Instant` into the composite `(epochMicros,
nanosWithinMicro)` pair used
+ * by `TimestampLTZNanosType(precision)`. `epochMicros` comes from
[[instantToMicros]] (floor
+ * toward `-inf` for the integral micro part); the last three decimal digits
of
+ * `instant.getNano` (`[0, 999]`) become `nanosWithinMicro` after dropping
`(9 - precision)` low
+ * digits.
+ *
+ * Combined, the result is the floor toward `-inf` of the original
nanosecond value rounded down
+ * to the precision step (10^(9 - precision) ns). At `precision = 9` the
conversion is lossless
+ * within the valid range; at 7 / 8 the lowest 2 / 1 sub-micro digits are
dropped. The same
+ * flooring will be the basis of the future `CAST(... AS
TIMESTAMP_LTZ(precision))` rule.
+ */
+ def instantToTimestampNanos(instant: Instant, precision: Int):
TimestampNanosVal = {
+ val epochMicros = instantToMicros(instant)
+ val rawNanosWithinMicro = instant.getNano % NANOS_PER_MICROS.toInt
+ val nanosWithinMicro =
truncateNanosWithinMicroToPrecision(rawNanosWithinMicro, precision)
+ TimestampNanosVal.fromParts(epochMicros, nanosWithinMicro.toShort)
+ }
+
+ /**
+ * Reverse of [[instantToTimestampNanos]]: rebuilds a `java.time.Instant`
from a
+ * `TimestampNanosVal`. `nanosWithinMicro` is in `[0, 999]` so `plusNanos`
never crosses the
+ * second boundary.
+ */
+ def timestampNanosToInstant(v: TimestampNanosVal): Instant = {
+ microsToInstant(v.epochMicros).plusNanos(v.nanosWithinMicro.toLong)
+ }
+
/**
* Converts the local date to the number of days since 1970-01-01.
*/
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala
index be66f851e361..de131f1d58e0 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala
@@ -36,7 +36,7 @@ import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.spark.sql.types.DayTimeIntervalType._
import org.apache.spark.sql.types.YearMonthIntervalType._
-import org.apache.spark.unsafe.types.{GeographyVal, GeometryVal, UTF8String}
+import org.apache.spark.unsafe.types.{GeographyVal, GeometryVal,
TimestampNanosVal, UTF8String}
import org.apache.spark.util.ArrayImplicits._
import org.apache.spark.util.collection.Utils
@@ -88,6 +88,8 @@ object CatalystTypeConverters {
case TimestampType if SQLConf.get.datetimeJava8ApiEnabled =>
InstantConverter
case TimestampType => TimestampConverter
case TimestampNTZType => TimestampNTZConverter
+ case t: TimestampNTZNanosType => new TimestampNTZNanosConverter(t)
+ case t: TimestampLTZNanosType => new TimestampLTZNanosConverter(t)
case dt: DecimalType => new DecimalConverter(dt)
case BooleanType => BooleanConverter
case ByteType => ByteConverter
@@ -298,7 +300,7 @@ object CatalystTypeConverters {
}
new GenericInternalRow(ar)
case other => throw new SparkIllegalArgumentException(
- errorClass = "_LEGACY_ERROR_TEMP_3219",
+ errorClass = "INVALID_EXTERNAL_VALUE",
messageParameters = scala.collection.immutable.Map(
"other" -> other.toString,
"otherClass" -> other.getClass.getCanonicalName,
@@ -357,7 +359,7 @@ object CatalystTypeConverters {
case chr: Char => UTF8String.fromString(chr.toString)
case ac: Array[Char] => UTF8String.fromString(String.valueOf(ac))
case other => throw new SparkIllegalArgumentException(
- errorClass = "_LEGACY_ERROR_TEMP_3219",
+ errorClass = "INVALID_EXTERNAL_VALUE",
messageParameters = scala.collection.immutable.Map(
"other" -> other.toString,
"otherClass" -> other.getClass.getCanonicalName,
@@ -383,7 +385,7 @@ object CatalystTypeConverters {
case g: org.apache.spark.sql.types.Geometry if
SQLConf.get.geospatialEnabled =>
STUtils.serializeGeomFromWKB(g, dataType)
case other => throw new SparkIllegalArgumentException(
- errorClass = "_LEGACY_ERROR_TEMP_3219",
+ errorClass = "INVALID_EXTERNAL_VALUE",
messageParameters = scala.collection.immutable.Map(
"other" -> other.toString,
"otherClass" -> other.getClass.getCanonicalName,
@@ -408,7 +410,7 @@ object CatalystTypeConverters {
case g: org.apache.spark.sql.types.Geography if
SQLConf.get.geospatialEnabled =>
STUtils.serializeGeogFromWKB(g, dataType)
case other => throw new SparkIllegalArgumentException(
- errorClass = "_LEGACY_ERROR_TEMP_3219",
+ errorClass = "INVALID_EXTERNAL_VALUE",
messageParameters = scala.collection.immutable.Map(
"other" -> other.toString,
"otherClass" -> other.getClass.getCanonicalName,
@@ -432,7 +434,7 @@ object CatalystTypeConverters {
case d: Date => DateTimeUtils.fromJavaDate(d)
case l: LocalDate => DateTimeUtils.localDateToDays(l)
case other => throw new SparkIllegalArgumentException(
- errorClass = "_LEGACY_ERROR_TEMP_3219",
+ errorClass = "INVALID_EXTERNAL_VALUE",
messageParameters = scala.collection.immutable.Map(
"other" -> other.toString,
"otherClass" -> other.getClass.getCanonicalName,
@@ -472,7 +474,7 @@ object CatalystTypeConverters {
case t: Timestamp => DateTimeUtils.fromJavaTimestamp(t)
case i: Instant => DateTimeUtils.instantToMicros(i)
case other => throw new SparkIllegalArgumentException(
- errorClass = "_LEGACY_ERROR_TEMP_3219",
+ errorClass = "INVALID_EXTERNAL_VALUE",
messageParameters = scala.collection.immutable.Map(
"other" -> other.toString,
"otherClass" -> other.getClass.getCanonicalName,
@@ -500,7 +502,7 @@ object CatalystTypeConverters {
override def toCatalystImpl(scalaValue: Any): Any = scalaValue match {
case l: LocalDateTime => DateTimeUtils.localDateTimeToMicros(l)
case other => throw new SparkIllegalArgumentException(
- errorClass = "_LEGACY_ERROR_TEMP_3219",
+ errorClass = "INVALID_EXTERNAL_VALUE",
messageParameters = scala.collection.immutable.Map(
"other" -> other.toString,
"otherClass" -> other.getClass.getCanonicalName,
@@ -515,6 +517,50 @@ object CatalystTypeConverters {
DateTimeUtils.microsToLocalDateTime(row.getLong(column))
}
+ private class TimestampNTZNanosConverter(dataType: TimestampNTZNanosType)
+ extends CatalystTypeConverter[Any, LocalDateTime, TimestampNanosVal] {
+ override def toCatalystImpl(scalaValue: Any): TimestampNanosVal =
scalaValue match {
+ case l: LocalDateTime => DateTimeUtils.localDateTimeToTimestampNanos(l,
dataType.precision)
+ case other => throw new SparkIllegalArgumentException(
+ errorClass = "INVALID_EXTERNAL_VALUE",
+ messageParameters = scala.collection.immutable.Map(
+ "other" -> other.toString,
+ "otherClass" -> other.getClass.getCanonicalName,
+ "dataType" -> dataType.sql))
+ }
+
+ override def toScala(catalystValue: TimestampNanosVal): LocalDateTime =
+ if (catalystValue == null) null
+ else DateTimeUtils.timestampNanosToLocalDateTime(catalystValue)
+
+ override def toScalaImpl(row: InternalRow, column: Int): LocalDateTime =
+
DateTimeUtils.timestampNanosToLocalDateTime(row.getTimestampNTZNanos(column))
+ }
+
+ // Always maps `TimestampLTZNanosType` to `java.time.Instant`. Unlike micro
`TimestampType`,
+ // the mapping does not consult `spark.sql.datetime.java8API.enabled`: the
nanos LTZ type is
+ // post-Java-8 and the legacy `java.sql.Timestamp` external type is
intentionally out of scope
+ // here. See SPARK-57033.
+ private class TimestampLTZNanosConverter(dataType: TimestampLTZNanosType)
+ extends CatalystTypeConverter[Any, Instant, TimestampNanosVal] {
+ override def toCatalystImpl(scalaValue: Any): TimestampNanosVal =
scalaValue match {
+ case i: Instant => DateTimeUtils.instantToTimestampNanos(i,
dataType.precision)
+ case other => throw new SparkIllegalArgumentException(
+ errorClass = "INVALID_EXTERNAL_VALUE",
+ messageParameters = scala.collection.immutable.Map(
+ "other" -> other.toString,
+ "otherClass" -> other.getClass.getCanonicalName,
+ "dataType" -> dataType.sql))
+ }
+
+ override def toScala(catalystValue: TimestampNanosVal): Instant =
+ if (catalystValue == null) null
+ else DateTimeUtils.timestampNanosToInstant(catalystValue)
+
+ override def toScalaImpl(row: InternalRow, column: Int): Instant =
+ DateTimeUtils.timestampNanosToInstant(row.getTimestampLTZNanos(column))
+ }
+
private class DecimalConverter(dataType: DecimalType)
extends CatalystTypeConverter[Any, JavaBigDecimal, Decimal] {
@@ -527,7 +573,7 @@ object CatalystTypeConverters {
case d: JavaBigInteger => Decimal(d)
case d: Decimal => d
case other => throw new SparkIllegalArgumentException(
- errorClass = "_LEGACY_ERROR_TEMP_3219",
+ errorClass = "INVALID_EXTERNAL_VALUE",
messageParameters = scala.collection.immutable.Map(
"other" -> other.toString,
"otherClass" -> other.getClass.getCanonicalName,
@@ -655,6 +701,9 @@ object CatalystTypeConverters {
case ld: LocalDate => LocalDateConverter.toCatalyst(ld)
case t: LocalTime => TimeConverter.toCatalyst(t)
case t: Timestamp => TimestampConverter.toCatalyst(t)
+ // SPARK-57033: schema-less convertToCatalyst keeps bare `Instant` /
`LocalDateTime` on the
+ // microsecond converters. The nanosecond path is schema-driven only -
users opt in via an
+ // explicit `TimestampLTZNanosType` / `TimestampNTZNanosType` column in
the schema.
case i: Instant => InstantConverter.toCatalyst(i)
case l: LocalDateTime => TimestampNTZConverter.toCatalyst(l)
case d: BigDecimal =>
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/DeserializerBuildHelper.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/DeserializerBuildHelper.scala
index 8bd162afd56d..17ba4fe4203b 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/DeserializerBuildHelper.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/DeserializerBuildHelper.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst
import org.apache.spark.sql.catalyst.{expressions => exprs}
import org.apache.spark.sql.catalyst.analysis.{GetColumnByOrdinal,
UnresolvedExtractValue}
import org.apache.spark.sql.catalyst.encoders.{AgnosticEncoder,
AgnosticEncoders, AgnosticExpressionPathEncoder, Codec, JavaSerializationCodec,
KryoSerializationCodec}
-import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.{ArrayEncoder,
BoxedLeafEncoder, CharEncoder, DateEncoder, DayTimeIntervalEncoder,
GeographyEncoder, GeometryEncoder, InstantEncoder, IterableEncoder,
JavaBeanEncoder, JavaBigIntEncoder, JavaDecimalEncoder, JavaEnumEncoder,
LocalDateEncoder, LocalDateTimeEncoder, LocalTimeEncoder, MapEncoder,
OptionEncoder, PrimitiveBooleanEncoder, PrimitiveByteEncoder,
PrimitiveDoubleEncoder, PrimitiveFloatEncoder, PrimitiveIntEncoder, Primi [...]
+import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.{ArrayEncoder,
BoxedLeafEncoder, CharEncoder, DateEncoder, DayTimeIntervalEncoder,
GeographyEncoder, GeometryEncoder, InstantEncoder, InstantNanosEncoder,
IterableEncoder, JavaBeanEncoder, JavaBigIntEncoder, JavaDecimalEncoder,
JavaEnumEncoder, LocalDateEncoder, LocalDateTimeEncoder,
LocalDateTimeNanosEncoder, LocalTimeEncoder, MapEncoder, OptionEncoder,
PrimitiveBooleanEncoder, PrimitiveByteEncoder, PrimitiveDoubleEncoder, P [...]
import org.apache.spark.sql.catalyst.encoders.EncoderUtils.{dataTypeForClass,
externalDataTypeFor, isNativeEncoder}
import org.apache.spark.sql.catalyst.expressions.{Expression, GetStructField,
IsNull, Literal, MapKeys, MapValues, UpCast}
import org.apache.spark.sql.catalyst.expressions.objects.{AssertNotNull,
CreateExternalRow, DecodeUsingSerializer, InitializeJavaBean, Invoke,
NewInstance, StaticInvoke, UnresolvedCatalystToExternalMap,
UnresolvedMapObjects, WrapOption}
@@ -176,6 +176,24 @@ object DeserializerBuildHelper {
returnNullable = false)
}
+ def createDeserializerForLocalDateTimeNanos(path: Expression): Expression = {
+ StaticInvoke(
+ DateTimeUtils.getClass,
+ ObjectType(classOf[java.time.LocalDateTime]),
+ "timestampNanosToLocalDateTime",
+ path :: Nil,
+ returnNullable = false)
+ }
+
+ def createDeserializerForInstantNanos(path: Expression): Expression = {
+ StaticInvoke(
+ DateTimeUtils.getClass,
+ ObjectType(classOf[java.time.Instant]),
+ "timestampNanosToInstant",
+ path :: Nil,
+ returnNullable = false)
+ }
+
def createDeserializerForLocalTime(path: Expression): Expression = {
StaticInvoke(
DateTimeUtils.getClass,
@@ -356,6 +374,10 @@ object DeserializerBuildHelper {
createDeserializerForInstant(path)
case LocalDateTimeEncoder =>
createDeserializerForLocalDateTime(path)
+ case _: LocalDateTimeNanosEncoder =>
+ createDeserializerForLocalDateTimeNanos(path)
+ case _: InstantNanosEncoder =>
+ createDeserializerForInstantNanos(path)
case LocalTimeEncoder if !SQLConf.get.isTimeTypeEnabled =>
throw
org.apache.spark.sql.errors.QueryCompilationErrors.unsupportedTimeTypeError()
case LocalTimeEncoder =>
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SerializerBuildHelper.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SerializerBuildHelper.scala
index 37a4efc65739..72337a4b2185 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SerializerBuildHelper.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SerializerBuildHelper.scala
@@ -22,7 +22,7 @@ import scala.language.existentials
import org.apache.spark.sql.catalyst.{expressions => exprs}
import
org.apache.spark.sql.catalyst.DeserializerBuildHelper.expressionWithNullSafety
import org.apache.spark.sql.catalyst.encoders.{AgnosticEncoder,
AgnosticEncoders, AgnosticExpressionPathEncoder, Codec, JavaSerializationCodec,
KryoSerializationCodec}
-import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.{ArrayEncoder,
BoxedBooleanEncoder, BoxedByteEncoder, BoxedDoubleEncoder, BoxedFloatEncoder,
BoxedIntEncoder, BoxedLeafEncoder, BoxedLongEncoder, BoxedShortEncoder,
CharEncoder, DateEncoder, DayTimeIntervalEncoder, GeographyEncoder,
GeometryEncoder, InstantEncoder, IterableEncoder, JavaBeanEncoder,
JavaBigIntEncoder, JavaDecimalEncoder, JavaEnumEncoder, LocalDateEncoder,
LocalDateTimeEncoder, LocalTimeEncoder, MapEncoder, Opt [...]
+import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.{ArrayEncoder,
BoxedBooleanEncoder, BoxedByteEncoder, BoxedDoubleEncoder, BoxedFloatEncoder,
BoxedIntEncoder, BoxedLeafEncoder, BoxedLongEncoder, BoxedShortEncoder,
CharEncoder, DateEncoder, DayTimeIntervalEncoder, GeographyEncoder,
GeometryEncoder, InstantEncoder, InstantNanosEncoder, IterableEncoder,
JavaBeanEncoder, JavaBigIntEncoder, JavaDecimalEncoder, JavaEnumEncoder,
LocalDateEncoder, LocalDateTimeEncoder, LocalDateTim [...]
import
org.apache.spark.sql.catalyst.encoders.EncoderUtils.{externalDataTypeFor,
isNativeEncoder, lenientExternalDataTypeFor}
import org.apache.spark.sql.catalyst.expressions.{BoundReference,
CheckOverflow, CreateNamedStruct, Expression, IsNull, KnownNotNull, Literal,
UnsafeArrayData}
import org.apache.spark.sql.catalyst.expressions.objects._
@@ -166,6 +166,28 @@ object SerializerBuildHelper {
returnNullable = false)
}
+ def createSerializerForLocalDateTimeNanos(
+ inputObject: Expression,
+ precision: Int): Expression = {
+ StaticInvoke(
+ DateTimeUtils.getClass,
+ TimestampNTZNanosType(precision),
+ "localDateTimeToTimestampNanos",
+ inputObject :: Literal(precision) :: Nil,
+ returnNullable = false)
+ }
+
+ def createSerializerForInstantNanos(
+ inputObject: Expression,
+ precision: Int): Expression = {
+ StaticInvoke(
+ DateTimeUtils.getClass,
+ TimestampLTZNanosType(precision),
+ "instantToTimestampNanos",
+ inputObject :: Literal(precision) :: Nil,
+ returnNullable = false)
+ }
+
def createSerializerForJavaLocalDate(inputObject: Expression): Expression = {
StaticInvoke(
DateTimeUtils.getClass,
@@ -376,6 +398,8 @@ object SerializerBuildHelper {
case TimestampEncoder(false) => createSerializerForSqlTimestamp(input)
case InstantEncoder(false) => createSerializerForJavaInstant(input)
case LocalDateTimeEncoder => createSerializerForLocalDateTime(input)
+ case LocalDateTimeNanosEncoder(p) =>
createSerializerForLocalDateTimeNanos(input, p)
+ case InstantNanosEncoder(p) => createSerializerForInstantNanos(input, p)
case LocalTimeEncoder if !SQLConf.get.isTimeTypeEnabled =>
throw
org.apache.spark.sql.errors.QueryCompilationErrors.unsupportedTimeTypeError()
case LocalTimeEncoder => createSerializerForLocalTime(input)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/EncoderUtils.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/EncoderUtils.scala
index 0fce96c15997..f65203aed8c7 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/EncoderUtils.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/EncoderUtils.scala
@@ -25,8 +25,8 @@ import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.types.{PhysicalBinaryType,
PhysicalIntegerType, PhysicalLongType}
import org.apache.spark.sql.catalyst.types.ops.TypeOps
import org.apache.spark.sql.catalyst.util.{ArrayData, MapData}
-import org.apache.spark.sql.types.{ArrayType, BinaryType, BooleanType,
ByteType, CalendarIntervalType, DataType, DateType, DayTimeIntervalType,
Decimal, DecimalType, DoubleType, FloatType, GeographyType, GeometryType,
IntegerType, LongType, MapType, ObjectType, ShortType, StringType, StructType,
TimestampNTZType, TimestampType, TimeType, UserDefinedType, VariantType,
YearMonthIntervalType}
-import org.apache.spark.unsafe.types.{CalendarInterval, GeographyVal,
GeometryVal, UTF8String, VariantVal}
+import org.apache.spark.sql.types.{ArrayType, BinaryType, BooleanType,
ByteType, CalendarIntervalType, DataType, DateType, DayTimeIntervalType,
Decimal, DecimalType, DoubleType, FloatType, GeographyType, GeometryType,
IntegerType, LongType, MapType, ObjectType, ShortType, StringType, StructType,
TimestampLTZNanosType, TimestampNTZNanosType, TimestampNTZType, TimestampType,
TimeType, UserDefinedType, VariantType, YearMonthIntervalType}
+import org.apache.spark.unsafe.types.{CalendarInterval, GeographyVal,
GeometryVal, TimestampNanosVal, UTF8String, VariantVal}
/**
* :: DeveloperApi ::
@@ -107,6 +107,8 @@ object EncoderUtils {
case _: DayTimeIntervalType => classOf[PhysicalLongType.InternalType]
case _: YearMonthIntervalType =>
classOf[PhysicalIntegerType.InternalType]
case _: TimeType => classOf[PhysicalLongType.InternalType]
+ case _: TimestampNTZNanosType => classOf[TimestampNanosVal]
+ case _: TimestampLTZNanosType => classOf[TimestampNanosVal]
case _: StringType => classOf[UTF8String]
case _: StructType => classOf[InternalRow]
case _: ArrayType => classOf[ArrayData]
@@ -126,6 +128,8 @@ object EncoderUtils {
case BinaryType => classOf[Array[Byte]]
case _: StringType => classOf[UTF8String]
case CalendarIntervalType => classOf[CalendarInterval]
+ case _: TimestampNTZNanosType => classOf[TimestampNanosVal]
+ case _: TimestampLTZNanosType => classOf[TimestampNanosVal]
case _: StructType => classOf[InternalRow]
case _: ArrayType => classOf[ArrayData]
case _: MapType => classOf[MapData]
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/CatalystTypeConvertersSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/CatalystTypeConvertersSuite.scala
index 222465d82c02..dbb9bc04914a 100644
---
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/CatalystTypeConvertersSuite.scala
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/CatalystTypeConvertersSuite.scala
@@ -29,7 +29,7 @@ import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.spark.sql.types.DayTimeIntervalType._
import org.apache.spark.sql.types.YearMonthIntervalType._
-import org.apache.spark.unsafe.types.{GeographyVal, GeometryVal, UTF8String}
+import org.apache.spark.unsafe.types.{GeographyVal, GeometryVal,
TimestampNanosVal, UTF8String}
class CatalystTypeConvertersSuite extends SparkFunSuite with SQLHelper {
@@ -109,7 +109,7 @@ class CatalystTypeConvertersSuite extends SparkFunSuite
with SQLHelper {
exception = intercept[SparkIllegalArgumentException] {
CatalystTypeConverters.createToCatalystConverter(structType)("test")
},
- condition = "_LEGACY_ERROR_TEMP_3219",
+ condition = "INVALID_EXTERNAL_VALUE",
parameters = Map(
"other" -> "test",
"otherClass" -> "java.lang.String",
@@ -149,7 +149,7 @@ class CatalystTypeConvertersSuite extends SparkFunSuite
with SQLHelper {
exception = intercept[SparkIllegalArgumentException] {
CatalystTypeConverters.createToCatalystConverter(decimalType)("test")
},
- condition = "_LEGACY_ERROR_TEMP_3219",
+ condition = "INVALID_EXTERNAL_VALUE",
parameters = Map(
"other" -> "test",
"otherClass" -> "java.lang.String",
@@ -169,7 +169,7 @@ class CatalystTypeConvertersSuite extends SparkFunSuite
with SQLHelper {
exception = intercept[SparkIllegalArgumentException] {
CatalystTypeConverters.createToCatalystConverter(StringType)(0.1)
},
- condition = "_LEGACY_ERROR_TEMP_3219",
+ condition = "INVALID_EXTERNAL_VALUE",
parameters = Map(
"other" -> "0.1",
"otherClass" -> "java.lang.Double",
@@ -250,6 +250,244 @@ class CatalystTypeConvertersSuite extends SparkFunSuite
with SQLHelper {
}
}
+ test("SPARK-57033: converting java.time.LocalDateTime to
TimestampNTZNanosType") {
+ Seq(
+ "0001-01-01T00:00:00",
+ "1582-10-02T01:02:03.04",
+ "1582-12-31T23:59:59.999999999",
+ "1970-01-01T00:00:00.000000001",
+ "1972-12-31T23:59:59.123456789",
+ "2019-02-26T16:56:00.123456789",
+ "9999-12-31T23:59:59.999999999").foreach { text =>
+ val input = LocalDateTime.parse(text)
+ Seq(7, 8, 9).foreach { p =>
+ val dt = TimestampNTZNanosType(p)
+ val result =
CatalystTypeConverters.createToCatalystConverter(dt)(input)
+ val expected = DateTimeUtils.localDateTimeToTimestampNanos(input, p)
+ assert(result === expected)
+ }
+ }
+ }
+
+ test("SPARK-57033: converting TimestampNTZNanosType to
java.time.LocalDateTime") {
+ Seq(
+ "1582-10-02T01:02:03.04",
+ "1582-12-31T23:59:59.999999999",
+ "1970-01-01T00:00:00.000000001",
+ "1972-12-31T23:59:59.123456789",
+ "2019-02-26T16:56:00.123456789",
+ "9999-12-31T23:59:59.999999999").foreach { text =>
+ val ldt = LocalDateTime.parse(text)
+ val v = DateTimeUtils.localDateTimeToTimestampNanos(ldt, precision = 9)
+ Seq(7, 8, 9).foreach { p =>
+ val dt = TimestampNTZNanosType(p)
+ assert(CatalystTypeConverters.createToScalaConverter(dt)(v) === ldt)
+ }
+ }
+ }
+
+ test("SPARK-57033: converting java.time.Instant to TimestampLTZNanosType") {
+ Seq(
+ "0001-01-01T00:00:00Z",
+ "1582-10-02T01:02:03.04Z",
+ "1582-12-31T23:59:59.999999999Z",
+ "1970-01-01T00:00:00.000000001Z",
+ "1972-12-31T23:59:59.123456789Z",
+ "2019-02-26T16:56:00.123456789Z",
+ "9999-12-31T23:59:59.999999999Z").foreach { text =>
+ val input = Instant.parse(text)
+ Seq(7, 8, 9).foreach { p =>
+ val dt = TimestampLTZNanosType(p)
+ val result =
CatalystTypeConverters.createToCatalystConverter(dt)(input)
+ val expected = DateTimeUtils.instantToTimestampNanos(input, p)
+ assert(result === expected)
+ }
+ }
+ }
+
+ test("SPARK-57033: converting TimestampLTZNanosType to java.time.Instant") {
+ Seq(
+ "1582-10-02T01:02:03.04Z",
+ "1582-12-31T23:59:59.999999999Z",
+ "1970-01-01T00:00:00.000000001Z",
+ "1972-12-31T23:59:59.123456789Z",
+ "2019-02-26T16:56:00.123456789Z",
+ "9999-12-31T23:59:59.999999999Z").foreach { text =>
+ val instant = Instant.parse(text)
+ val v = DateTimeUtils.instantToTimestampNanos(instant, precision = 9)
+ Seq(7, 8, 9).foreach { p =>
+ val dt = TimestampLTZNanosType(p)
+ assert(CatalystTypeConverters.createToScalaConverter(dt)(v) ===
instant)
+ }
+ }
+ }
+
+ test("SPARK-57033: TimestampLTZNanosType -> Instant ignores java8 API flag")
{
+ val instant = Instant.parse("2019-02-26T16:56:00.123456789Z")
+ val v = DateTimeUtils.instantToTimestampNanos(instant, precision = 9)
+ val dt = TimestampLTZNanosType()
+ Seq("true", "false").foreach { flag =>
+ withSQLConf(SQLConf.DATETIME_JAVA8API_ENABLED.key -> flag) {
+ assert(CatalystTypeConverters.createToCatalystConverter(dt)(instant)
=== v)
+ assert(CatalystTypeConverters.createToScalaConverter(dt)(v) ===
instant)
+ }
+ }
+ }
+
+ test("SPARK-57033: TimestampNanosConverter null handling in rows") {
+ val schema = StructType(
+ StructField("ntz", TimestampNTZNanosType(9), nullable = true) ::
+ StructField("ltz", TimestampLTZNanosType(9), nullable = true) :: Nil)
+ val toCat = CatalystTypeConverters.createToCatalystConverter(schema)
+ val toScala = CatalystTypeConverters.createToScalaConverter(schema)
+ // Reference value to ensure non-null cells are kept as-is.
+ val ldt = LocalDateTime.parse("2019-02-26T16:56:00.123456789")
+ val instant = Instant.parse("2019-02-26T16:56:00.987654321Z")
+ val row = Row(ldt, instant)
+ val catalystRow = toCat(row).asInstanceOf[InternalRow]
+ assert(catalystRow.getTimestampNTZNanos(0) ===
+ DateTimeUtils.localDateTimeToTimestampNanos(ldt, precision = 9))
+ assert(catalystRow.getTimestampLTZNanos(1) ===
+ DateTimeUtils.instantToTimestampNanos(instant, precision = 9))
+ assert(toScala(catalystRow) === row)
+ // Null row.
+ val nullRow = Row.fromSeq(Seq(null, null))
+ assert(toScala(toCat(nullRow)) === nullRow)
+ }
+
+ test("SPARK-57033: TimestampNTZNanosType converter truncates sub-micro to
precision") {
+ val ldt = LocalDateTime.parse("2019-02-26T16:56:00.123456789")
+ val negativeEpochLdt = LocalDateTime.parse("1969-12-31T23:59:59.123456789")
+ val cases = Map(7 -> 700, 8 -> 780, 9 -> 789)
+ Seq(ldt, negativeEpochLdt).foreach { input =>
+ cases.foreach { case (p, expectedNanosWithinMicro) =>
+ val dt = TimestampNTZNanosType(p)
+ val v = CatalystTypeConverters.createToCatalystConverter(dt)(input)
+ .asInstanceOf[TimestampNanosVal]
+ assert(v.nanosWithinMicro === expectedNanosWithinMicro,
+ s"input=$input, precision=$p: expected $expectedNanosWithinMicro, " +
+ s"got ${v.nanosWithinMicro}")
+ }
+ }
+ }
+
+ test("SPARK-57033: TimestampLTZNanosType converter truncates sub-micro to
precision") {
+ val instant = Instant.parse("2019-02-26T16:56:00.123456789Z")
+ val negativeEpochInstant = Instant.parse("1969-12-31T23:59:59.123456789Z")
+ val cases = Map(7 -> 700, 8 -> 780, 9 -> 789)
+ Seq(instant, negativeEpochInstant).foreach { input =>
+ cases.foreach { case (p, expectedNanosWithinMicro) =>
+ val dt = TimestampLTZNanosType(p)
+ val v = CatalystTypeConverters.createToCatalystConverter(dt)(input)
+ .asInstanceOf[TimestampNanosVal]
+ assert(v.nanosWithinMicro === expectedNanosWithinMicro,
+ s"input=$input, precision=$p: expected $expectedNanosWithinMicro, " +
+ s"got ${v.nanosWithinMicro}")
+ }
+ }
+ }
+
+ test("SPARK-57033: TimestampNTZNanosType converter rejects wrong external
type") {
+ val dt = TimestampNTZNanosType(9)
+ checkError(
+ exception = intercept[SparkIllegalArgumentException] {
+
CatalystTypeConverters.createToCatalystConverter(dt)("not-a-LocalDateTime")
+ },
+ condition = "INVALID_EXTERNAL_VALUE",
+ parameters = Map(
+ "other" -> "not-a-LocalDateTime",
+ "otherClass" -> "java.lang.String",
+ "dataType" -> "TIMESTAMP_NTZ(9)"))
+ // An `Instant` is also not accepted - the NTZ nano converter is
wall-clock only.
+ val instant = Instant.parse("2019-02-26T16:56:00.123456789Z")
+ checkError(
+ exception = intercept[SparkIllegalArgumentException] {
+ CatalystTypeConverters.createToCatalystConverter(dt)(instant)
+ },
+ condition = "INVALID_EXTERNAL_VALUE",
+ parameters = Map(
+ "other" -> instant.toString,
+ "otherClass" -> "java.time.Instant",
+ "dataType" -> "TIMESTAMP_NTZ(9)"))
+ }
+
+ test("SPARK-57033: TimestampLTZNanosType converter rejects wrong external
type") {
+ val dt = TimestampLTZNanosType(9)
+ checkError(
+ exception = intercept[SparkIllegalArgumentException] {
+ CatalystTypeConverters.createToCatalystConverter(dt)("not-an-Instant")
+ },
+ condition = "INVALID_EXTERNAL_VALUE",
+ parameters = Map(
+ "other" -> "not-an-Instant",
+ "otherClass" -> "java.lang.String",
+ "dataType" -> "TIMESTAMP_LTZ(9)"))
+ // A `LocalDateTime` is also not accepted - LTZ requires an absolute
instant.
+ val ldt = LocalDateTime.parse("2019-02-26T16:56:00.123456789")
+ checkError(
+ exception = intercept[SparkIllegalArgumentException] {
+ CatalystTypeConverters.createToCatalystConverter(dt)(ldt)
+ },
+ condition = "INVALID_EXTERNAL_VALUE",
+ parameters = Map(
+ "other" -> ldt.toString,
+ "otherClass" -> "java.time.LocalDateTime",
+ "dataType" -> "TIMESTAMP_LTZ(9)"))
+ // The legacy `java.sql.Timestamp` external type is intentionally out of
scope for
+ // `TimestampLTZNanosType` (see SPARK-57033). Verify it is rejected, not
silently
+ // accepted via a fallback.
+ val ts =
java.sql.Timestamp.from(Instant.parse("2019-02-26T16:56:00.123456789Z"))
+ checkError(
+ exception = intercept[SparkIllegalArgumentException] {
+ CatalystTypeConverters.createToCatalystConverter(dt)(ts)
+ },
+ condition = "INVALID_EXTERNAL_VALUE",
+ parameters = Map(
+ "other" -> ts.toString,
+ "otherClass" -> "java.sql.Timestamp",
+ "dataType" -> "TIMESTAMP_LTZ(9)"))
+ }
+
+ test("SPARK-57033: nested nanos timestamp types in arrays / maps / structs")
{
+ val ldt = LocalDateTime.parse("2019-02-26T16:56:00.123456789")
+ val instant = Instant.parse("2019-02-26T16:56:00.987654321Z")
+ val ldtNano = DateTimeUtils.localDateTimeToTimestampNanos(ldt, precision =
9)
+ val instantNano = DateTimeUtils.instantToTimestampNanos(instant, precision
= 9)
+
+ // Array of TimestampNTZNanosType.
+ val ntzArrayType = ArrayType(TimestampNTZNanosType(9), containsNull = true)
+ val ldts = Seq(ldt, null, ldt)
+ val catArr =
CatalystTypeConverters.createToCatalystConverter(ntzArrayType)(ldts)
+ .asInstanceOf[GenericArrayData]
+ assert(catArr.numElements() === 3)
+ assert(catArr.get(0, TimestampNTZNanosType(9)) === ldtNano)
+ assert(catArr.isNullAt(1))
+ assert(catArr.get(2, TimestampNTZNanosType(9)) === ldtNano)
+ assert(CatalystTypeConverters.createToScalaConverter(ntzArrayType)(catArr)
=== ldts)
+
+ // Map of (String -> TimestampLTZNanosType).
+ val ltzMapType = MapType(StringType, TimestampLTZNanosType(9),
valueContainsNull = true)
+ val instants = Map[String, Instant]("a" -> instant, "b" -> null)
+ val catMap =
CatalystTypeConverters.createToCatalystConverter(ltzMapType)(instants)
+ val backMap =
CatalystTypeConverters.createToScalaConverter(ltzMapType)(catMap)
+ .asInstanceOf[Map[Any, Any]]
+ assert(backMap("a") === instant)
+ assert(backMap("b") == null)
+
+ // Struct with both nano fields plus a non-nano field, and a null row.
+ val nestedStruct = StructType(
+ StructField("ntz", TimestampNTZNanosType(9), nullable = true) ::
+ StructField("ltz", TimestampLTZNanosType(9), nullable = true) ::
+ StructField("name", StringType, nullable = true) :: Nil)
+ val outerStruct = StructType(StructField("inner", nestedStruct, nullable =
true) :: Nil)
+ val outerToCat =
CatalystTypeConverters.createToCatalystConverter(outerStruct)
+ val outerToScala =
CatalystTypeConverters.createToScalaConverter(outerStruct)
+ val outerRow = Row(Row(ldt, instant, "abc"))
+ val nullOuterRow = Row(null)
+ assert(outerToScala(outerToCat(outerRow)) === outerRow)
+ assert(outerToScala(outerToCat(nullOuterRow)) === nullOuterRow)
+ }
+
test("converting java.time.LocalDate to DateType") {
Seq(
"0101-02-16",
@@ -579,7 +817,7 @@ class CatalystTypeConvertersSuite extends SparkFunSuite
with SQLHelper {
exception = intercept[SparkIllegalArgumentException] {
CatalystTypeConverters.createToCatalystConverter(gt)("test")
},
- condition = "_LEGACY_ERROR_TEMP_3219",
+ condition = "INVALID_EXTERNAL_VALUE",
parameters = Map(
"other" -> "test",
"otherClass" -> "java.lang.String",
@@ -592,7 +830,7 @@ class CatalystTypeConvertersSuite extends SparkFunSuite
with SQLHelper {
exception = intercept[SparkIllegalArgumentException] {
CatalystTypeConverters.createToCatalystConverter(gt)("test")
},
- condition = "_LEGACY_ERROR_TEMP_3219",
+ condition = "INVALID_EXTERNAL_VALUE",
parameters = Map(
"other" -> "test",
"otherClass" -> "java.lang.String",
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/RowEncoderSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/RowEncoderSuite.scala
index 09247a459b9c..69f4995220fe 100644
---
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/RowEncoderSuite.scala
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/RowEncoderSuite.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst.encoders
import scala.collection.mutable
import scala.util.Random
-import org.apache.spark.SparkRuntimeException
+import org.apache.spark.{SparkException, SparkRuntimeException}
import org.apache.spark.sql.{RandomDataGenerator, Row}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.plans.CodegenInterpretedPlanTest
@@ -381,6 +381,76 @@ class RowEncoderSuite extends CodegenInterpretedPlanTest {
assert(readback.get(0) === localDateTime)
}
+ test("SPARK-57033: encoding/decoding TimestampNTZNanosType to/from
java.time.LocalDateTime") {
+ val inputs = Seq(
+ java.time.LocalDateTime.parse("2019-02-26T16:56:00.123456789"),
+ java.time.LocalDateTime.parse("1969-12-31T23:59:59.123456789"))
+ for (localDateTime <- inputs) {
+ for (p <- TimestampNTZNanosType.MIN_PRECISION to
TimestampNTZNanosType.MAX_PRECISION) {
+ val schema = new StructType().add("t", TimestampNTZNanosType(p))
+ val encoder = ExpressionEncoder(schema).resolveAndBind()
+ val row = toRow(encoder, Row(localDateTime))
+ val expected =
DateTimeUtils.localDateTimeToTimestampNanos(localDateTime, p)
+ assert(row.getTimestampNTZNanos(0) === expected)
+ val readback = fromRow(encoder, row)
+ assert(readback.get(0) ===
DateTimeUtils.timestampNanosToLocalDateTime(expected))
+ }
+ }
+ }
+
+ test("SPARK-57033: encoding/decoding TimestampLTZNanosType to/from
java.time.Instant") {
+ val inputs = Seq(
+ java.time.Instant.parse("2019-02-26T16:56:00.123456789Z"),
+ java.time.Instant.parse("1969-12-31T23:59:59.123456789Z"))
+ for (instant <- inputs) {
+ for (p <- TimestampLTZNanosType.MIN_PRECISION to
TimestampLTZNanosType.MAX_PRECISION) {
+ val schema = new StructType().add("t", TimestampLTZNanosType(p))
+ val encoder = ExpressionEncoder(schema).resolveAndBind()
+ val row = toRow(encoder, Row(instant))
+ val expected = DateTimeUtils.instantToTimestampNanos(instant, p)
+ assert(row.getTimestampLTZNanos(0) === expected)
+ val readback = fromRow(encoder, row)
+ assert(readback.get(0) ===
DateTimeUtils.timestampNanosToInstant(expected))
+ }
+ }
+ }
+
+ test("SPARK-57033: encoding/decoding TimestampLTZNanosType ignores java8 API
flag") {
+ val instant = java.time.Instant.parse("2019-02-26T16:56:00.123456789Z")
+ val schema = new StructType().add("t", TimestampLTZNanosType())
+ Seq("true", "false").foreach { flag =>
+ withSQLConf(SQLConf.DATETIME_JAVA8API_ENABLED.key -> flag) {
+ val encoder = ExpressionEncoder(schema).resolveAndBind()
+ val row = toRow(encoder, Row(instant))
+ assert(row.getTimestampLTZNanos(0) ===
+ DateTimeUtils.instantToTimestampNanos(instant, precision = 9))
+ val readback = fromRow(encoder, row)
+ assert(readback.get(0) === instant)
+ }
+ }
+ }
+
+ test("SPARK-57033: RowEncoder rejects nanos timestamp types when feature
flag is off") {
+ Seq(
+ new StructType().add("t", TimestampNTZNanosType()),
+ new StructType().add("t", TimestampLTZNanosType())
+ ).foreach { schema =>
+ withSQLConf(SQLConf.TIMESTAMP_NANOS_TYPES_ENABLED.key -> "false") {
+ checkError(
+ exception = intercept[SparkException] {
+ ExpressionEncoder(schema)
+ },
+ condition = "FEATURE_NOT_ENABLED",
+ parameters = Map(
+ "featureName" -> "Nanosecond-precision timestamp types",
+ "configKey" -> "spark.sql.timestampNanosTypes.enabled",
+ "configValue" -> "true"
+ )
+ )
+ }
+ }
+ }
+
test("encoding/decoding DateType to/from java.time.LocalDate") {
withSQLConf(SQLConf.DATETIME_JAVA8API_ENABLED.key -> "true") {
val schema = new StructType().add("d", DateType)
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala
index 4810ec69bb96..47eb4a1e3e3c 100644
---
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala
@@ -36,7 +36,7 @@ import org.apache.spark.sql.errors.DataTypeErrors.toSQLConf
import org.apache.spark.sql.internal.SqlApiConf
import org.apache.spark.sql.types.DayTimeIntervalType.{HOUR, SECOND}
import org.apache.spark.sql.types.Decimal
-import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String}
+import org.apache.spark.unsafe.types.{CalendarInterval, TimestampNanosVal,
UTF8String}
class DateTimeUtilsSuite extends SparkFunSuite with Matchers with SQLHelper {
@@ -1851,4 +1851,150 @@ class DateTimeUtilsSuite extends SparkFunSuite with
Matchers with SQLHelper {
timeBucketYMInterval(1, Long.MinValue, 0L, utc)
}
}
+
+ test("SPARK-57033: java.time.LocalDateTime <-> TimestampNanosVal roundtrip")
{
+ val cases = Seq(
+ LocalDateTime.parse("0001-01-01T00:00:00"),
+ LocalDateTime.parse("1582-10-02T01:02:03.04"),
+ LocalDateTime.parse("1582-12-31T23:59:59.999999999"),
+ LocalDateTime.parse("1969-12-31T23:59:59.999999999"),
+ LocalDateTime.parse("1970-01-01T00:00:00"),
+ LocalDateTime.parse("1970-01-01T00:00:00.000000001"),
+ LocalDateTime.parse("1970-01-01T00:00:00.123456789"),
+ LocalDateTime.parse("2019-02-26T16:56:00.123456789"),
+ LocalDateTime.parse("9999-12-31T23:59:59.999999999"))
+ for (ldt <- cases) {
+ val v = localDateTimeToTimestampNanos(ldt, precision = 9)
+ assert(v.nanosWithinMicro >= 0 && v.nanosWithinMicro <= 999,
+ s"nanosWithinMicro out of range for $ldt: $v")
+ assert(v.nanosWithinMicro === (ldt.getNano % 1000).toShort)
+ assert(v.epochMicros === DateTimeUtils.localDateTimeToMicros(ldt))
+ assert(timestampNanosToLocalDateTime(v) === ldt)
+ }
+ }
+
+ test("SPARK-57033: java.time.Instant <-> TimestampNanosVal roundtrip") {
+ val cases = Seq(
+ Instant.EPOCH,
+ Instant.parse("0001-01-01T00:00:00Z"),
+ Instant.parse("1582-10-02T01:02:03.04Z"),
+ Instant.parse("1582-12-31T23:59:59.999999999Z"),
+ Instant.parse("1969-12-31T23:59:59.999999999Z"),
+ Instant.parse("1970-01-01T00:00:00.000000001Z"),
+ Instant.parse("2019-02-26T16:56:00.123456789Z"),
+ Instant.parse("9999-12-31T23:59:59.999999999Z"))
+ for (i <- cases) {
+ val v = instantToTimestampNanos(i, precision = 9)
+ assert(v.nanosWithinMicro >= 0 && v.nanosWithinMicro <= 999,
+ s"nanosWithinMicro out of range for $i: $v")
+ assert(v.nanosWithinMicro === (i.getNano % 1000).toShort)
+ assert(v.epochMicros === instantToMicros(i))
+ assert(timestampNanosToInstant(v) === i)
+ }
+ }
+
+ test("SPARK-57033: TimestampNanosVal random roundtrip") {
+ val rnd = new scala.util.Random(0)
+ // Random instants across the valid range with every possible sub-micro
digit.
+ val min = Instant.parse("0001-01-01T00:00:00Z").getEpochSecond
+ val max = Instant.parse("9999-12-31T23:59:59.999999999Z").getEpochSecond
+ for (_ <- 0 until 200) {
+ val secs = min + math.abs(rnd.nextLong()) % (max - min + 1)
+ val nano = rnd.nextInt(1_000_000_000)
+ val i = Instant.ofEpochSecond(secs, nano.toLong)
+ val v = instantToTimestampNanos(i, precision = 9)
+ assert(timestampNanosToInstant(v) === i)
+ val ldt = LocalDateTime.ofInstant(i, ZoneOffset.UTC)
+ val v2 = localDateTimeToTimestampNanos(ldt, precision = 9)
+ assert(timestampNanosToLocalDateTime(v2) === ldt)
+ // Internal layout matches direct decomposition.
+ assert(v === TimestampNanosVal.fromParts(instantToMicros(i), (nano %
1000).toShort))
+ }
+ }
+
+ test("SPARK-57033: localDateTimeToTimestampNanos truncates sub-micro to
precision") {
+ val ldt = LocalDateTime.parse("2019-02-26T16:56:00.123456789")
+ // precision 9 keeps all 3 sub-micro digits, 8 drops the last, 7 drops the
last two.
+ assert(localDateTimeToTimestampNanos(ldt, precision = 9).nanosWithinMicro
=== 789)
+ assert(localDateTimeToTimestampNanos(ldt, precision = 8).nanosWithinMicro
=== 780)
+ assert(localDateTimeToTimestampNanos(ldt, precision = 7).nanosWithinMicro
=== 700)
+ // epochMicros is unaffected by sub-micro truncation.
+ val expectedMicros = DateTimeUtils.localDateTimeToMicros(ldt)
+ for (p <- 7 to 9) {
+ assert(localDateTimeToTimestampNanos(ldt, precision = p).epochMicros ===
expectedMicros)
+ }
+
+ val negativeEpochLdt = LocalDateTime.parse("1969-12-31T23:59:59.123456789")
+ assert(localDateTimeToTimestampNanos(negativeEpochLdt, precision =
9).nanosWithinMicro === 789)
+ assert(localDateTimeToTimestampNanos(negativeEpochLdt, precision =
8).nanosWithinMicro === 780)
+ assert(localDateTimeToTimestampNanos(negativeEpochLdt, precision =
7).nanosWithinMicro === 700)
+ val negativeExpectedMicros =
DateTimeUtils.localDateTimeToMicros(negativeEpochLdt)
+ for (p <- 7 to 9) {
+ assert(localDateTimeToTimestampNanos(negativeEpochLdt, precision =
p).epochMicros ===
+ negativeExpectedMicros)
+ }
+ }
+
+ test("SPARK-57033: instantToTimestampNanos truncates sub-micro to
precision") {
+ val i = Instant.parse("2019-02-26T16:56:00.123456789Z")
+ assert(instantToTimestampNanos(i, precision = 9).nanosWithinMicro === 789)
+ assert(instantToTimestampNanos(i, precision = 8).nanosWithinMicro === 780)
+ assert(instantToTimestampNanos(i, precision = 7).nanosWithinMicro === 700)
+ val expectedMicros = instantToMicros(i)
+ for (p <- 7 to 9) {
+ assert(instantToTimestampNanos(i, precision = p).epochMicros ===
expectedMicros)
+ }
+
+ val negativeEpochInstant = Instant.parse("1969-12-31T23:59:59.123456789Z")
+ assert(instantToTimestampNanos(negativeEpochInstant, precision =
9).nanosWithinMicro === 789)
+ assert(instantToTimestampNanos(negativeEpochInstant, precision =
8).nanosWithinMicro === 780)
+ assert(instantToTimestampNanos(negativeEpochInstant, precision =
7).nanosWithinMicro === 700)
+ val negativeExpectedMicros = instantToMicros(negativeEpochInstant)
+ for (p <- 7 to 9) {
+ assert(instantToTimestampNanos(negativeEpochInstant, precision =
p).epochMicros ===
+ negativeExpectedMicros)
+ }
+ }
+
+ test("SPARK-57033: random roundtrip across precisions floors to the
precision step") {
+ val rnd = new scala.util.Random(0)
+ val min = Instant.parse("0001-01-01T00:00:00Z").getEpochSecond
+ val max = Instant.parse("9999-12-31T23:59:59.999999999Z").getEpochSecond
+ // For each random instant, verify both helpers floor the original
nanosecond value
+ // (toward `-inf`) to the precision step `10^(9 - p)` ns. The
whole-instant nanosecond
+ // count overflows `Long` for far-future dates, so we check the floor on
the components
+ // instead: `epochMicros` is invariant across precisions (matches
`instantToMicros`) and
+ // the sub-micro nanosecond residual is floored to the precision step.
+ for (_ <- 0 until 10) {
+ val secs = min + math.abs(rnd.nextLong()) % (max - min + 1)
+ val nano = rnd.nextInt(1_000_000_000)
+ val instant = Instant.ofEpochSecond(secs, nano.toLong)
+ val ldt = LocalDateTime.ofInstant(instant, ZoneOffset.UTC)
+ val expectedMicros = instantToMicros(instant)
+ val rawSubMicro = (nano % 1000).toShort
+ for (p <- 7 to 9) {
+ val step = math.pow(10, 9 - p).toLong.toShort
+ val expectedSubMicro = ((rawSubMicro / step) * step).toShort
+
+ val ltz = instantToTimestampNanos(instant, p)
+ assert(ltz.epochMicros === expectedMicros,
+ s"LTZ p=$p instant=$instant epochMicros=${ltz.epochMicros}")
+ assert(ltz.nanosWithinMicro === expectedSubMicro,
+ s"LTZ p=$p instant=$instant
nanosWithinMicro=${ltz.nanosWithinMicro}")
+ // Roundtrip preserves the truncated value.
+ val ltzBack = timestampNanosToInstant(ltz)
+ assert(instantToMicros(ltzBack) === expectedMicros)
+ assert(ltzBack.getNano % 1000 === expectedSubMicro)
+
+ val ntz = localDateTimeToTimestampNanos(ldt, p)
+ assert(ntz.epochMicros === expectedMicros,
+ s"NTZ p=$p ldt=$ldt epochMicros=${ntz.epochMicros}")
+ assert(ntz.nanosWithinMicro === expectedSubMicro,
+ s"NTZ p=$p ldt=$ldt nanosWithinMicro=${ntz.nanosWithinMicro}")
+ val ntzBack = timestampNanosToLocalDateTime(ntz)
+ assert(DateTimeUtils.localDateTimeToMicros(ntzBack) === expectedMicros)
+ assert(ntzBack.getNano % 1000 === expectedSubMicro)
+ }
+ }
+ }
}
diff --git
a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java
b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java
index 62d44e0af8b0..416c92602b83 100644
--- a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java
+++ b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java
@@ -789,6 +789,24 @@ public class JavaDatasetSuite implements Serializable {
Assertions.assertEquals(data, ds.collectAsList());
}
+ @Test
+ public void testTimestampNanosRowEncoder() {
+ final StructType schema = new StructType()
+ .add("ntz", org.apache.spark.sql.types.TimestampNTZNanosType.apply())
+ .add("ltz", org.apache.spark.sql.types.TimestampLTZNanosType.apply());
+ LocalDateTime ldt = LocalDateTime.parse("2019-02-26T16:56:00.123456789");
+ Instant instant = Instant.parse("2019-02-26T16:56:00.987654321Z");
+ List<Row> rows = Arrays.asList(create(ldt, instant), create(null, null));
+ Dataset<Row> ds = spark.createDataset(rows, Encoders.row(schema));
+ Assertions.assertEquals(schema, ds.schema());
+ List<Row> collected = ds.collectAsList();
+ Assertions.assertEquals(2, collected.size());
+ Assertions.assertEquals(ldt, collected.get(0).get(0));
+ Assertions.assertEquals(instant, collected.get(0).get(1));
+ Assertions.assertTrue(collected.get(1).isNullAt(0));
+ Assertions.assertTrue(collected.get(1).isNullAt(1));
+ }
+
@Test
public void testDurationEncoder() {
Encoder<Duration> encoder = Encoders.DURATION();
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
index 879569045b6d..dc930af87490 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql
import java.io.{Externalizable, ObjectInput, ObjectOutput}
import java.sql.{Date, Timestamp}
+import java.time.{Instant, LocalDateTime}
import scala.collection.immutable.HashSet
import scala.collection.mutable
@@ -2631,6 +2632,49 @@ class DatasetSuite extends SharedSparkSession
assert(Seq(localDateTime).toDS().head() === localDateTime)
}
+ test("SPARK-57033: Dataset[Row] roundtrip preserves nanosecond precision") {
+ val schema = new StructType()
+ .add("ntz", TimestampNTZNanosType(9), nullable = true)
+ .add("ltz", TimestampLTZNanosType(9), nullable = true)
+ val rows = Seq(
+ Row(
+ LocalDateTime.parse("2019-02-26T16:56:00.123456789"),
+ Instant.parse("2019-02-26T16:56:00.987654321Z")),
+ Row(
+ LocalDateTime.parse("9999-12-31T23:59:59.999999999"),
+ Instant.parse("9999-12-31T23:59:59.999999999Z")),
+ Row(null, null))
+ val df = spark.createDataFrame(rows.asJava, schema)
+ assert(df.schema === schema)
+ checkAnswer(df, rows)
+ }
+
+ test("SPARK-57033: Dataset[Row] roundtrip truncates sub-micro to declared
precision") {
+ val ldt = LocalDateTime.parse("2019-02-26T16:56:00.123456789")
+ val instant = Instant.parse("2019-02-26T16:56:00.123456789Z")
+ val negativeEpochLdt = LocalDateTime.parse("1969-12-31T23:59:59.123456789")
+ val negativeEpochInstant = Instant.parse("1969-12-31T23:59:59.123456789Z")
+ // At p=7 the last two sub-micro digits are dropped (789 -> 700);
+ // at p=8 only the last one is dropped (789 -> 780).
+ val expectedSubMicro = Map(7 -> 700, 8 -> 780)
+
+ for (p <- 7 to 8) {
+ val schema = new StructType()
+ .add("ntz", TimestampNTZNanosType(p), nullable = true)
+ .add("ltz", TimestampLTZNanosType(p), nullable = true)
+ val rows = Seq(
+ Row(ldt, instant),
+ Row(negativeEpochLdt, negativeEpochInstant))
+ val df = spark.createDataFrame(rows.asJava, schema)
+ assert(df.schema === schema)
+ val drop = 789 - expectedSubMicro(p)
+ val expected = Seq(
+ Row(ldt.minusNanos(drop), instant.minusNanos(drop)),
+ Row(negativeEpochLdt.minusNanos(drop),
negativeEpochInstant.minusNanos(drop)))
+ checkAnswer(df, expected)
+ }
+ }
+
test("SPARK-34605: implicit encoder for java.time.Duration") {
val duration = java.time.Duration.ofMinutes(10)
assert(spark.range(1).map { _ => duration }.head() === duration)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]