This is an automated email from the ASF dual-hosted git repository.

MaxGekk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new e3587a414caa [SPARK-57033][SQL] Add java.time LocalDateTime/Instant 
conversion and Dataset roundtrip for nanosecond timestamps
e3587a414caa is described below

commit e3587a414caa24900e14ac72f550ab5249817c7a
Author: Maxim Gekk <[email protected]>
AuthorDate: Sun May 31 19:28:14 2026 +0200

    [SPARK-57033][SQL] Add java.time LocalDateTime/Instant conversion and 
Dataset roundtrip for nanosecond timestamps
    
    ### What changes were proposed in this pull request?
    
    This PR wires `java.time.LocalDateTime` / `java.time.Instant` through the 
encoder, converter and Dataset stack for the nanosecond-capable timestamp types 
`TimestampNTZNanosType(p)` and `TimestampLTZNanosType(p)` introduced by 
SPARK-56981, so that `spark.createDataFrame(rows, schema).collect()` over a 
schema declaring a nanos timestamp column preserves full nanosecond precision 
end-to-end.
    
    Concretely:
    
    - Four new `DateTimeUtils` helpers: `localDateTimeToTimestampNanos`, 
`timestampNanosToLocalDateTime`, `instantToTimestampNanos`, 
`timestampNanosToInstant`. The integral micro part reuses `instantToMicros` / 
`localDateTimeToMicros` (and its `MIN_SECONDS` guard); the sub-micro nanos are 
kept as `nanosWithinMicro` in `[0, 999]`.
    - Two new precision-aware `AgnosticEncoder` leaves: 
`LocalDateTimeNanosEncoder(precision)` and `InstantNanosEncoder(precision)`, 
with `RowEncoder.encoderForDataType` mapping the nanos data types to them.
    - New `CatalystTypeConverters`, `SerializerBuildHelper` and 
`DeserializerBuildHelper` entries dispatching on the new encoders / data types. 
`TimestampLTZNanosType` always maps to `Instant`, independent of 
`spark.sql.datetime.java8API.enabled`.
    - `EncoderUtils.dataTypeJavaClassDefault` / `javaBoxedType` learn the new 
logical types so `StaticInvoke` codegen picks up `TimestampNanosVal` as the 
column's Java class.
    
    ### Why are the changes needed?
    
    `SPARK-56981` added physical row storage for nanosecond timestamps, but 
there was no conversion layer from the external Java types. As a result, even 
with the nanos schema explicitly declared, Dataset create -> internal row -> 
collect roundtrips silently truncated to micros:
    
    ```scala
    val ldt = java.time.LocalDateTime.parse("2019-02-26T16:56:00.123456789")
    val schema = new StructType().add("t", TimestampNTZNanosType(9))
    spark.createDataFrame(java.util.Arrays.asList(Row(ldt)), 
schema).collect()(0).get(0)
    // before this PR: 2019-02-26T16:56:00.123456     (last 3 digits dropped)
    // after  this PR: 2019-02-26T16:56:00.123456789  (preserved)
    ```
    
    This change is required for the rest of the nanos-timestamp SPIP 
(SPARK-56822) to be useful from the DataFrame / Dataset API.
    
    ### Does this PR introduce _any_ user-facing change?
    Yes, but only for the new `TimestampNTZNanosType` / `TimestampLTZNanosType` 
(preview feature, gated by `spark.sql.timestampNanosTypes.enabled`):
    - A `Dataset[Row]` whose schema declares a nanos timestamp column now 
accepts `LocalDateTime` (NTZ Nanos) / `Instant` (LTZ Nanos) and returns them on 
`collect` with full nanosecond precision.
    - A bare `LocalDateTime` or `Instant` value (without an explicit schema) 
still resolves to the existing micro `TimestampNTZType` / `TimestampType` 
encoders, so `Dataset[LocalDateTime]` / `Dataset[Instant]` behavior is 
unchanged.
    
    Existing micro timestamp behavior is unchanged.
    
    ### How was this patch tested?
    New unit tests, all passing locally:
    - `DateTimeUtilsSuite` - three `SPARK-57033` tests for the helpers (LDT, 
Instant, randomized roundtrip across the full valid range, including pre-epoch, 
epoch, max range, sub-micro digits, and the `nanosWithinMicro in [0, 999]` 
invariant).
    - `CatalystTypeConvertersSuite` - both directions for 
`TimestampNTZNanosType` / `TimestampLTZNanosType` across precisions 7-9, a 
flag-independence test for the LTZ Nanos path, and a null-row roundtrip via 
`Row` + schema.
    - `RowEncoderSuite` - encode/decode tests for both nanos types across all 
precisions and both codegen / interpreted paths, plus a Java8 flag-independence 
test.
    - `DatasetSuite` - end-to-end `spark.createDataFrame(rows, 
schema).collect()` with sub-micro fractional digits, a max-range edge value, 
and a null row.
    - `JavaDatasetSuite` - `testTimestampNanosRowEncoder` exercising 
`spark.createDataset(rows, Encoders.row(schema))` from Java.
    Regression check across adjacent suites (no failures):
    - `build/sbt 'catalyst/testOnly *DateTimeUtilsSuite 
*CatalystTypeConvertersSuite *RowEncoderSuite'` - 233/233 pass.
    - `build/sbt 'catalyst/testOnly *ExpressionEncoderSuite 
*TimestampNanosRowSuite *LiteralExpressionSuite'` - 453/453 pass.
    - `build/sbt 'sql/testOnly *DatasetSuite'` - 276/276 pass (221 Scala + 55 
Java).
    - `./dev/scalastyle` and `./dev/lint-java` clean.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    Generated-by: Cursor 2.0 (Claude Opus 4.7)
    
    Closes #56158 from MaxGekk/nanos-java-types.
    
    Authored-by: Maxim Gekk <[email protected]>
    Signed-off-by: Max Gekk <[email protected]>
---
 .../src/main/resources/error/error-conditions.json |  11 +-
 docs/sql-ref-datatypes.md                          |   2 +-
 .../sql/catalyst/encoders/AgnosticEncoder.scala    |   8 +
 .../spark/sql/catalyst/encoders/RowEncoder.scala   |  14 +-
 .../sql/catalyst/util/SparkDateTimeUtils.scala     |  78 ++++++-
 .../sql/catalyst/CatalystTypeConverters.scala      |  67 +++++-
 .../sql/catalyst/DeserializerBuildHelper.scala     |  24 +-
 .../spark/sql/catalyst/SerializerBuildHelper.scala |  26 ++-
 .../spark/sql/catalyst/encoders/EncoderUtils.scala |   8 +-
 .../sql/catalyst/CatalystTypeConvertersSuite.scala | 250 ++++++++++++++++++++-
 .../sql/catalyst/encoders/RowEncoderSuite.scala    |  72 +++++-
 .../sql/catalyst/util/DateTimeUtilsSuite.scala     | 148 +++++++++++-
 .../org/apache/spark/sql/JavaDatasetSuite.java     |  18 ++
 .../scala/org/apache/spark/sql/DatasetSuite.scala  |  44 ++++
 14 files changed, 740 insertions(+), 30 deletions(-)

diff --git a/common/utils/src/main/resources/error/error-conditions.json 
b/common/utils/src/main/resources/error/error-conditions.json
index 91976d21934d..a5e539199b75 100644
--- a/common/utils/src/main/resources/error/error-conditions.json
+++ b/common/utils/src/main/resources/error/error-conditions.json
@@ -3757,6 +3757,12 @@
     ],
     "sqlState" : "42K0N"
   },
+  "INVALID_EXTERNAL_VALUE" : {
+    "message" : [
+      "The value (<other>) of the type (<otherClass>) cannot be converted to 
the <dataType> type."
+    ],
+    "sqlState" : "42K0N"
+  },
   "INVALID_EXTRACT_BASE_FIELD_TYPE" : {
     "message" : [
       "Can't extract a value from <base>. Need a complex type [STRUCT, ARRAY, 
MAP] but got <other>."
@@ -11526,11 +11532,6 @@
       "Must be 2 children: <others>"
     ]
   },
-  "_LEGACY_ERROR_TEMP_3219" : {
-    "message" : [
-      "The value (<other>) of the type (<otherClass>) cannot be converted to 
the <dataType> type."
-    ]
-  },
   "_LEGACY_ERROR_TEMP_3220" : {
     "message" : [
       "The value (<other>) of the type (<otherClass>) cannot be converted to 
an array of <elementType>"
diff --git a/docs/sql-ref-datatypes.md b/docs/sql-ref-datatypes.md
index fe1b8724d8d6..e0d36631dec0 100644
--- a/docs/sql-ref-datatypes.md
+++ b/docs/sql-ref-datatypes.md
@@ -54,7 +54,7 @@ Spark SQL and DataFrames support the following data types:
   - `TimestampNTZType`: Timestamp without time zone(TIMESTAMP_NTZ). It 
represents values comprising values of fields year, month, day,
   hour, minute, and second. All operations are performed without taking any 
time zone into account.
     - Note: TIMESTAMP in Spark is a user-specified alias associated with one 
of the TIMESTAMP_LTZ and TIMESTAMP_NTZ variations.  Users can set the default 
timestamp type as `TIMESTAMP_LTZ`(default value) or `TIMESTAMP_NTZ` via the 
configuration `spark.sql.timestampType`.
-  - `TimestampNTZNanosType(precision)` / `TimestampLTZNanosType(precision)`: 
Preview nanosecond-capable variants of `TIMESTAMP_NTZ` and `TIMESTAMP_LTZ` with 
fractional seconds precision `precision` in `[7, 9]`. Unparameterized 
`TIMESTAMP`, `TIMESTAMP_NTZ`, and `TIMESTAMP_LTZ` remain microsecond types. 
Enable the preview feature with `SET 
spark.sql.timestampNanosTypes.enabled=true;` before using these types in 
schemas or SQL.
+  - `TimestampNTZNanosType(precision)` / `TimestampLTZNanosType(precision)`: 
Preview nanosecond-capable variants of `TIMESTAMP_NTZ` and `TIMESTAMP_LTZ` with 
fractional seconds precision `precision` in `[7, 9]`. Unparameterized 
`TIMESTAMP`, `TIMESTAMP_NTZ`, and `TIMESTAMP_LTZ` remain microsecond types. In 
schema-driven Dataset/DataFrame conversion, Spark maps `TimestampNTZNanosType` 
to `java.time.LocalDateTime` and `TimestampLTZNanosType` to 
`java.time.Instant`; values with more sub-micro [...]
 
 * Interval types
   - `YearMonthIntervalType(startField, endField)`: Represents a year-month 
interval which is made up of a contiguous subset of the following fields:
diff --git 
a/sql/api/src/main/scala/org/apache/spark/sql/catalyst/encoders/AgnosticEncoder.scala
 
b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/encoders/AgnosticEncoder.scala
index 20949c188cb8..57c15de4f0db 100644
--- 
a/sql/api/src/main/scala/org/apache/spark/sql/catalyst/encoders/AgnosticEncoder.scala
+++ 
b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/encoders/AgnosticEncoder.scala
@@ -257,6 +257,14 @@ object AgnosticEncoders {
   case class InstantEncoder(override val lenientSerialization: Boolean)
       extends LeafEncoder[Instant](TimestampType)
   case object LocalDateTimeEncoder extends 
LeafEncoder[LocalDateTime](TimestampNTZType)
+  // Nanosecond-precision counterparts of `LocalDateTimeEncoder` / 
`InstantEncoder(false)`.
+  // They are used by `RowEncoder` when the schema declares a 
`TimestampNTZNanosType(p)` or
+  // `TimestampLTZNanosType(p)` column, so Dataset create/collect roundtrips 
preserve full
+  // nanosecond precision. See SPARK-57033.
+  case class LocalDateTimeNanosEncoder(precision: Int)
+      extends LeafEncoder[LocalDateTime](TimestampNTZNanosType(precision))
+  case class InstantNanosEncoder(precision: Int)
+      extends LeafEncoder[Instant](TimestampLTZNanosType(precision))
   case object LocalTimeEncoder extends LeafEncoder[LocalTime](TimeType())
 
   case class SparkDecimalEncoder(dt: DecimalType) extends 
LeafEncoder[Decimal](dt)
diff --git 
a/sql/api/src/main/scala/org/apache/spark/sql/catalyst/encoders/RowEncoder.scala
 
b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/encoders/RowEncoder.scala
index bad673672188..705d5d8f11b1 100644
--- 
a/sql/api/src/main/scala/org/apache/spark/sql/catalyst/encoders/RowEncoder.scala
+++ 
b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/encoders/RowEncoder.scala
@@ -21,8 +21,8 @@ import scala.collection.mutable
 import scala.reflect.classTag
 
 import org.apache.spark.sql.{AnalysisException, Row}
-import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.{BinaryEncoder, 
BoxedBooleanEncoder, BoxedByteEncoder, BoxedDoubleEncoder, BoxedFloatEncoder, 
BoxedIntEncoder, BoxedLongEncoder, BoxedShortEncoder, CalendarIntervalEncoder, 
CharEncoder, DateEncoder, DayTimeIntervalEncoder, EncoderField, 
GeographyEncoder, GeometryEncoder, InstantEncoder, IterableEncoder, 
JavaDecimalEncoder, LocalDateEncoder, LocalDateTimeEncoder, LocalTimeEncoder, 
MapEncoder, NullEncoder, RowEncoder => Agnosti [...]
-import org.apache.spark.sql.errors.DataTypeErrorsBase
+import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.{BinaryEncoder, 
BoxedBooleanEncoder, BoxedByteEncoder, BoxedDoubleEncoder, BoxedFloatEncoder, 
BoxedIntEncoder, BoxedLongEncoder, BoxedShortEncoder, CalendarIntervalEncoder, 
CharEncoder, DateEncoder, DayTimeIntervalEncoder, EncoderField, 
GeographyEncoder, GeometryEncoder, InstantEncoder, InstantNanosEncoder, 
IterableEncoder, JavaDecimalEncoder, LocalDateEncoder, LocalDateTimeEncoder, 
LocalDateTimeNanosEncoder, LocalTimeEncoder [...]
+import org.apache.spark.sql.errors.{DataTypeErrors, DataTypeErrorsBase}
 import org.apache.spark.sql.internal.SqlApiConf
 import org.apache.spark.sql.types._
 import org.apache.spark.sql.types.ops.TypeApiOps
@@ -50,6 +50,8 @@ import org.apache.spark.util.ArrayImplicits._
  *   TimestampType -> java.time.Instant if spark.sql.datetime.java8API.enabled 
is true
  *
  *   TimestampNTZType -> java.time.LocalDateTime
+ *   TimestampNTZNanosType -> java.time.LocalDateTime
+ *   TimestampLTZNanosType -> java.time.Instant
  *   TimeType -> java.time.LocalTime
  *
  *   DayTimeIntervalType -> java.time.Duration
@@ -97,6 +99,14 @@ object RowEncoder extends DataTypeErrorsBase {
       case TimestampType if SqlApiConf.get.datetimeJava8ApiEnabled => 
InstantEncoder(lenient)
       case TimestampType => TimestampEncoder(lenient)
       case TimestampNTZType => LocalDateTimeEncoder
+      // Nano timestamp types intentionally do not honor `lenient`: legacy 
`java.sql.Timestamp` /
+      // `java.sql.Date` external types are out of scope for nanosecond 
precision (SPARK-57033).
+      case t: TimestampNTZNanosType =>
+        DataTypeErrors.checkTimestampNanosTypesEnabled()
+        LocalDateTimeNanosEncoder(t.precision)
+      case t: TimestampLTZNanosType =>
+        DataTypeErrors.checkTimestampNanosTypesEnabled()
+        InstantNanosEncoder(t.precision)
       case DateType if SqlApiConf.get.datetimeJava8ApiEnabled => 
LocalDateEncoder(lenient)
       case DateType => DateEncoder(lenient)
       case _: TimeType => LocalTimeEncoder
diff --git 
a/sql/api/src/main/scala/org/apache/spark/sql/catalyst/util/SparkDateTimeUtils.scala
 
b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/util/SparkDateTimeUtils.scala
index 9684737a2286..597a96c548ce 100644
--- 
a/sql/api/src/main/scala/org/apache/spark/sql/catalyst/util/SparkDateTimeUtils.scala
+++ 
b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/util/SparkDateTimeUtils.scala
@@ -31,7 +31,7 @@ import org.apache.spark.sql.catalyst.util.DateTimeConstants._
 import 
org.apache.spark.sql.catalyst.util.RebaseDateTime.{rebaseGregorianToJulianDays, 
rebaseGregorianToJulianMicros, rebaseJulianToGregorianDays, 
rebaseJulianToGregorianMicros}
 import org.apache.spark.sql.errors.ExecutionErrors
 import org.apache.spark.sql.types.{DateType, TimestampType, TimeType}
-import org.apache.spark.unsafe.types.UTF8String
+import org.apache.spark.unsafe.types.{TimestampNanosVal, UTF8String}
 import org.apache.spark.util.SparkClassUtils
 
 trait SparkDateTimeUtils {
@@ -208,6 +208,82 @@ trait SparkDateTimeUtils {
     instantToMicros(localDateTime.toInstant(ZoneOffset.UTC))
   }
 
+  /**
+   * Truncates the sub-microsecond nanosecond part to the given timestamp 
precision `p` in [7, 9].
+   * Precision 9 keeps all three digits, 8 zeros the last digit, 7 zeros the 
last two.
+   *
+   * The input is the already-extracted `nanosWithinMicro` component 
(`0..999`), so truncation is
+   * independent of the epoch sign of the original timestamp value.
+   *
+   * Precisions outside `[7, 9]` are passed through unchanged because the 
surrounding timestamp
+   * nanos types validate the bound.
+   */
+  private def truncateNanosWithinMicroToPrecision(nanosWithinMicro: Int, 
precision: Int): Int = {
+    precision match {
+      case 7 => (nanosWithinMicro / 100) * 100
+      case 8 => (nanosWithinMicro / 10) * 10
+      case _ => nanosWithinMicro
+    }
+  }
+
+  /**
+   * Converts a `java.time.LocalDateTime` into the composite `(epochMicros, 
nanosWithinMicro)`
+   * pair used by `TimestampNTZNanosType(precision)` (interpreted at UTC). 
`epochMicros` comes
+   * from [[localDateTimeToMicros]] (which is floor toward `-inf` for the 
integral micro part);
+   * the last three decimal digits of `localDateTime.getNano` (`[0, 999]`) 
become
+   * `nanosWithinMicro` after dropping `(9 - precision)` low digits.
+   *
+   * Combined, the result is the floor toward `-inf` of the original 
nanosecond value rounded down
+   * to the precision step (10^(9 - precision) ns). At `precision = 9` the 
conversion is lossless
+   * within the valid range; at 7 / 8 the lowest 2 / 1 sub-micro digits are 
dropped. The same
+   * flooring will be the basis of the future `CAST(... AS 
TIMESTAMP_NTZ(precision))` rule.
+   */
+  def localDateTimeToTimestampNanos(
+      localDateTime: LocalDateTime,
+      precision: Int): TimestampNanosVal = {
+    val epochMicros = localDateTimeToMicros(localDateTime)
+    val rawNanosWithinMicro = localDateTime.getNano % NANOS_PER_MICROS.toInt
+    val nanosWithinMicro = 
truncateNanosWithinMicroToPrecision(rawNanosWithinMicro, precision)
+    TimestampNanosVal.fromParts(epochMicros, nanosWithinMicro.toShort)
+  }
+
+  /**
+   * Reverse of [[localDateTimeToTimestampNanos]]: rebuilds a 
`java.time.LocalDateTime` (at UTC)
+   * from a `TimestampNanosVal`. `nanosWithinMicro` is in `[0, 999]` so 
`plusNanos` never crosses
+   * the second boundary.
+   */
+  def timestampNanosToLocalDateTime(v: TimestampNanosVal): LocalDateTime = {
+    microsToLocalDateTime(v.epochMicros).plusNanos(v.nanosWithinMicro.toLong)
+  }
+
+  /**
+   * Converts a `java.time.Instant` into the composite `(epochMicros, 
nanosWithinMicro)` pair used
+   * by `TimestampLTZNanosType(precision)`. `epochMicros` comes from 
[[instantToMicros]] (floor
+   * toward `-inf` for the integral micro part); the last three decimal digits 
of
+   * `instant.getNano` (`[0, 999]`) become `nanosWithinMicro` after dropping 
`(9 - precision)` low
+   * digits.
+   *
+   * Combined, the result is the floor toward `-inf` of the original 
nanosecond value rounded down
+   * to the precision step (10^(9 - precision) ns). At `precision = 9` the 
conversion is lossless
+   * within the valid range; at 7 / 8 the lowest 2 / 1 sub-micro digits are 
dropped. The same
+   * flooring will be the basis of the future `CAST(... AS 
TIMESTAMP_LTZ(precision))` rule.
+   */
+  def instantToTimestampNanos(instant: Instant, precision: Int): 
TimestampNanosVal = {
+    val epochMicros = instantToMicros(instant)
+    val rawNanosWithinMicro = instant.getNano % NANOS_PER_MICROS.toInt
+    val nanosWithinMicro = 
truncateNanosWithinMicroToPrecision(rawNanosWithinMicro, precision)
+    TimestampNanosVal.fromParts(epochMicros, nanosWithinMicro.toShort)
+  }
+
+  /**
+   * Reverse of [[instantToTimestampNanos]]: rebuilds a `java.time.Instant` 
from a
+   * `TimestampNanosVal`. `nanosWithinMicro` is in `[0, 999]` so `plusNanos` 
never crosses the
+   * second boundary.
+   */
+  def timestampNanosToInstant(v: TimestampNanosVal): Instant = {
+    microsToInstant(v.epochMicros).plusNanos(v.nanosWithinMicro.toLong)
+  }
+
   /**
    * Converts the local date to the number of days since 1970-01-01.
    */
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala
index be66f851e361..de131f1d58e0 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala
@@ -36,7 +36,7 @@ import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
 import org.apache.spark.sql.types.DayTimeIntervalType._
 import org.apache.spark.sql.types.YearMonthIntervalType._
-import org.apache.spark.unsafe.types.{GeographyVal, GeometryVal, UTF8String}
+import org.apache.spark.unsafe.types.{GeographyVal, GeometryVal, 
TimestampNanosVal, UTF8String}
 import org.apache.spark.util.ArrayImplicits._
 import org.apache.spark.util.collection.Utils
 
@@ -88,6 +88,8 @@ object CatalystTypeConverters {
       case TimestampType if SQLConf.get.datetimeJava8ApiEnabled => 
InstantConverter
       case TimestampType => TimestampConverter
       case TimestampNTZType => TimestampNTZConverter
+      case t: TimestampNTZNanosType => new TimestampNTZNanosConverter(t)
+      case t: TimestampLTZNanosType => new TimestampLTZNanosConverter(t)
       case dt: DecimalType => new DecimalConverter(dt)
       case BooleanType => BooleanConverter
       case ByteType => ByteConverter
@@ -298,7 +300,7 @@ object CatalystTypeConverters {
         }
         new GenericInternalRow(ar)
       case other => throw new SparkIllegalArgumentException(
-        errorClass = "_LEGACY_ERROR_TEMP_3219",
+        errorClass = "INVALID_EXTERNAL_VALUE",
         messageParameters = scala.collection.immutable.Map(
           "other" -> other.toString,
           "otherClass" -> other.getClass.getCanonicalName,
@@ -357,7 +359,7 @@ object CatalystTypeConverters {
       case chr: Char => UTF8String.fromString(chr.toString)
       case ac: Array[Char] => UTF8String.fromString(String.valueOf(ac))
       case other => throw new SparkIllegalArgumentException(
-        errorClass = "_LEGACY_ERROR_TEMP_3219",
+        errorClass = "INVALID_EXTERNAL_VALUE",
         messageParameters = scala.collection.immutable.Map(
           "other" -> other.toString,
           "otherClass" -> other.getClass.getCanonicalName,
@@ -383,7 +385,7 @@ object CatalystTypeConverters {
       case g: org.apache.spark.sql.types.Geometry if 
SQLConf.get.geospatialEnabled =>
         STUtils.serializeGeomFromWKB(g, dataType)
       case other => throw new SparkIllegalArgumentException(
-        errorClass = "_LEGACY_ERROR_TEMP_3219",
+        errorClass = "INVALID_EXTERNAL_VALUE",
         messageParameters = scala.collection.immutable.Map(
           "other" -> other.toString,
           "otherClass" -> other.getClass.getCanonicalName,
@@ -408,7 +410,7 @@ object CatalystTypeConverters {
       case g: org.apache.spark.sql.types.Geography if 
SQLConf.get.geospatialEnabled =>
         STUtils.serializeGeogFromWKB(g, dataType)
       case other => throw new SparkIllegalArgumentException(
-        errorClass = "_LEGACY_ERROR_TEMP_3219",
+        errorClass = "INVALID_EXTERNAL_VALUE",
         messageParameters = scala.collection.immutable.Map(
           "other" -> other.toString,
           "otherClass" -> other.getClass.getCanonicalName,
@@ -432,7 +434,7 @@ object CatalystTypeConverters {
       case d: Date => DateTimeUtils.fromJavaDate(d)
       case l: LocalDate => DateTimeUtils.localDateToDays(l)
       case other => throw new SparkIllegalArgumentException(
-        errorClass = "_LEGACY_ERROR_TEMP_3219",
+        errorClass = "INVALID_EXTERNAL_VALUE",
         messageParameters = scala.collection.immutable.Map(
           "other" -> other.toString,
           "otherClass" -> other.getClass.getCanonicalName,
@@ -472,7 +474,7 @@ object CatalystTypeConverters {
       case t: Timestamp => DateTimeUtils.fromJavaTimestamp(t)
       case i: Instant => DateTimeUtils.instantToMicros(i)
       case other => throw new SparkIllegalArgumentException(
-        errorClass = "_LEGACY_ERROR_TEMP_3219",
+        errorClass = "INVALID_EXTERNAL_VALUE",
         messageParameters = scala.collection.immutable.Map(
           "other" -> other.toString,
           "otherClass" -> other.getClass.getCanonicalName,
@@ -500,7 +502,7 @@ object CatalystTypeConverters {
     override def toCatalystImpl(scalaValue: Any): Any = scalaValue match {
       case l: LocalDateTime => DateTimeUtils.localDateTimeToMicros(l)
       case other => throw new SparkIllegalArgumentException(
-        errorClass = "_LEGACY_ERROR_TEMP_3219",
+        errorClass = "INVALID_EXTERNAL_VALUE",
         messageParameters = scala.collection.immutable.Map(
           "other" -> other.toString,
           "otherClass" -> other.getClass.getCanonicalName,
@@ -515,6 +517,50 @@ object CatalystTypeConverters {
       DateTimeUtils.microsToLocalDateTime(row.getLong(column))
   }
 
+  private class TimestampNTZNanosConverter(dataType: TimestampNTZNanosType)
+    extends CatalystTypeConverter[Any, LocalDateTime, TimestampNanosVal] {
+    override def toCatalystImpl(scalaValue: Any): TimestampNanosVal = 
scalaValue match {
+      case l: LocalDateTime => DateTimeUtils.localDateTimeToTimestampNanos(l, 
dataType.precision)
+      case other => throw new SparkIllegalArgumentException(
+        errorClass = "INVALID_EXTERNAL_VALUE",
+        messageParameters = scala.collection.immutable.Map(
+          "other" -> other.toString,
+          "otherClass" -> other.getClass.getCanonicalName,
+          "dataType" -> dataType.sql))
+    }
+
+    override def toScala(catalystValue: TimestampNanosVal): LocalDateTime =
+      if (catalystValue == null) null
+      else DateTimeUtils.timestampNanosToLocalDateTime(catalystValue)
+
+    override def toScalaImpl(row: InternalRow, column: Int): LocalDateTime =
+      
DateTimeUtils.timestampNanosToLocalDateTime(row.getTimestampNTZNanos(column))
+  }
+
+  // Always maps `TimestampLTZNanosType` to `java.time.Instant`. Unlike micro 
`TimestampType`,
+  // the mapping does not consult `spark.sql.datetime.java8API.enabled`: the 
nanos LTZ type is
+  // post-Java-8 and the legacy `java.sql.Timestamp` external type is 
intentionally out of scope
+  // here. See SPARK-57033.
+  private class TimestampLTZNanosConverter(dataType: TimestampLTZNanosType)
+    extends CatalystTypeConverter[Any, Instant, TimestampNanosVal] {
+    override def toCatalystImpl(scalaValue: Any): TimestampNanosVal = 
scalaValue match {
+      case i: Instant => DateTimeUtils.instantToTimestampNanos(i, 
dataType.precision)
+      case other => throw new SparkIllegalArgumentException(
+        errorClass = "INVALID_EXTERNAL_VALUE",
+        messageParameters = scala.collection.immutable.Map(
+          "other" -> other.toString,
+          "otherClass" -> other.getClass.getCanonicalName,
+          "dataType" -> dataType.sql))
+    }
+
+    override def toScala(catalystValue: TimestampNanosVal): Instant =
+      if (catalystValue == null) null
+      else DateTimeUtils.timestampNanosToInstant(catalystValue)
+
+    override def toScalaImpl(row: InternalRow, column: Int): Instant =
+      DateTimeUtils.timestampNanosToInstant(row.getTimestampLTZNanos(column))
+  }
+
   private class DecimalConverter(dataType: DecimalType)
     extends CatalystTypeConverter[Any, JavaBigDecimal, Decimal] {
 
@@ -527,7 +573,7 @@ object CatalystTypeConverters {
         case d: JavaBigInteger => Decimal(d)
         case d: Decimal => d
         case other => throw new SparkIllegalArgumentException(
-          errorClass = "_LEGACY_ERROR_TEMP_3219",
+          errorClass = "INVALID_EXTERNAL_VALUE",
           messageParameters = scala.collection.immutable.Map(
             "other" -> other.toString,
             "otherClass" -> other.getClass.getCanonicalName,
@@ -655,6 +701,9 @@ object CatalystTypeConverters {
     case ld: LocalDate => LocalDateConverter.toCatalyst(ld)
     case t: LocalTime => TimeConverter.toCatalyst(t)
     case t: Timestamp => TimestampConverter.toCatalyst(t)
+    // SPARK-57033: schema-less convertToCatalyst keeps bare `Instant` / 
`LocalDateTime` on the
+    // microsecond converters. The nanosecond path is schema-driven only - 
users opt in via an
+    // explicit `TimestampLTZNanosType` / `TimestampNTZNanosType` column in 
the schema.
     case i: Instant => InstantConverter.toCatalyst(i)
     case l: LocalDateTime => TimestampNTZConverter.toCatalyst(l)
     case d: BigDecimal =>
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/DeserializerBuildHelper.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/DeserializerBuildHelper.scala
index 8bd162afd56d..17ba4fe4203b 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/DeserializerBuildHelper.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/DeserializerBuildHelper.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst
 import org.apache.spark.sql.catalyst.{expressions => exprs}
 import org.apache.spark.sql.catalyst.analysis.{GetColumnByOrdinal, 
UnresolvedExtractValue}
 import org.apache.spark.sql.catalyst.encoders.{AgnosticEncoder, 
AgnosticEncoders, AgnosticExpressionPathEncoder, Codec, JavaSerializationCodec, 
KryoSerializationCodec}
-import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.{ArrayEncoder, 
BoxedLeafEncoder, CharEncoder, DateEncoder, DayTimeIntervalEncoder, 
GeographyEncoder, GeometryEncoder, InstantEncoder, IterableEncoder, 
JavaBeanEncoder, JavaBigIntEncoder, JavaDecimalEncoder, JavaEnumEncoder, 
LocalDateEncoder, LocalDateTimeEncoder, LocalTimeEncoder, MapEncoder, 
OptionEncoder, PrimitiveBooleanEncoder, PrimitiveByteEncoder, 
PrimitiveDoubleEncoder, PrimitiveFloatEncoder, PrimitiveIntEncoder, Primi [...]
+import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.{ArrayEncoder, 
BoxedLeafEncoder, CharEncoder, DateEncoder, DayTimeIntervalEncoder, 
GeographyEncoder, GeometryEncoder, InstantEncoder, InstantNanosEncoder, 
IterableEncoder, JavaBeanEncoder, JavaBigIntEncoder, JavaDecimalEncoder, 
JavaEnumEncoder, LocalDateEncoder, LocalDateTimeEncoder, 
LocalDateTimeNanosEncoder, LocalTimeEncoder, MapEncoder, OptionEncoder, 
PrimitiveBooleanEncoder, PrimitiveByteEncoder, PrimitiveDoubleEncoder, P [...]
 import org.apache.spark.sql.catalyst.encoders.EncoderUtils.{dataTypeForClass, 
externalDataTypeFor, isNativeEncoder}
 import org.apache.spark.sql.catalyst.expressions.{Expression, GetStructField, 
IsNull, Literal, MapKeys, MapValues, UpCast}
 import org.apache.spark.sql.catalyst.expressions.objects.{AssertNotNull, 
CreateExternalRow, DecodeUsingSerializer, InitializeJavaBean, Invoke, 
NewInstance, StaticInvoke, UnresolvedCatalystToExternalMap, 
UnresolvedMapObjects, WrapOption}
@@ -176,6 +176,24 @@ object DeserializerBuildHelper {
       returnNullable = false)
   }
 
+  def createDeserializerForLocalDateTimeNanos(path: Expression): Expression = {
+    StaticInvoke(
+      DateTimeUtils.getClass,
+      ObjectType(classOf[java.time.LocalDateTime]),
+      "timestampNanosToLocalDateTime",
+      path :: Nil,
+      returnNullable = false)
+  }
+
+  def createDeserializerForInstantNanos(path: Expression): Expression = {
+    StaticInvoke(
+      DateTimeUtils.getClass,
+      ObjectType(classOf[java.time.Instant]),
+      "timestampNanosToInstant",
+      path :: Nil,
+      returnNullable = false)
+  }
+
   def createDeserializerForLocalTime(path: Expression): Expression = {
     StaticInvoke(
       DateTimeUtils.getClass,
@@ -356,6 +374,10 @@ object DeserializerBuildHelper {
       createDeserializerForInstant(path)
     case LocalDateTimeEncoder =>
       createDeserializerForLocalDateTime(path)
+    case _: LocalDateTimeNanosEncoder =>
+      createDeserializerForLocalDateTimeNanos(path)
+    case _: InstantNanosEncoder =>
+      createDeserializerForInstantNanos(path)
     case LocalTimeEncoder if !SQLConf.get.isTimeTypeEnabled =>
       throw 
org.apache.spark.sql.errors.QueryCompilationErrors.unsupportedTimeTypeError()
     case LocalTimeEncoder =>
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SerializerBuildHelper.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SerializerBuildHelper.scala
index 37a4efc65739..72337a4b2185 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SerializerBuildHelper.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SerializerBuildHelper.scala
@@ -22,7 +22,7 @@ import scala.language.existentials
 import org.apache.spark.sql.catalyst.{expressions => exprs}
 import 
org.apache.spark.sql.catalyst.DeserializerBuildHelper.expressionWithNullSafety
 import org.apache.spark.sql.catalyst.encoders.{AgnosticEncoder, 
AgnosticEncoders, AgnosticExpressionPathEncoder, Codec, JavaSerializationCodec, 
KryoSerializationCodec}
-import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.{ArrayEncoder, 
BoxedBooleanEncoder, BoxedByteEncoder, BoxedDoubleEncoder, BoxedFloatEncoder, 
BoxedIntEncoder, BoxedLeafEncoder, BoxedLongEncoder, BoxedShortEncoder, 
CharEncoder, DateEncoder, DayTimeIntervalEncoder, GeographyEncoder, 
GeometryEncoder, InstantEncoder, IterableEncoder, JavaBeanEncoder, 
JavaBigIntEncoder, JavaDecimalEncoder, JavaEnumEncoder, LocalDateEncoder, 
LocalDateTimeEncoder, LocalTimeEncoder, MapEncoder, Opt [...]
+import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.{ArrayEncoder, 
BoxedBooleanEncoder, BoxedByteEncoder, BoxedDoubleEncoder, BoxedFloatEncoder, 
BoxedIntEncoder, BoxedLeafEncoder, BoxedLongEncoder, BoxedShortEncoder, 
CharEncoder, DateEncoder, DayTimeIntervalEncoder, GeographyEncoder, 
GeometryEncoder, InstantEncoder, InstantNanosEncoder, IterableEncoder, 
JavaBeanEncoder, JavaBigIntEncoder, JavaDecimalEncoder, JavaEnumEncoder, 
LocalDateEncoder, LocalDateTimeEncoder, LocalDateTim [...]
 import 
org.apache.spark.sql.catalyst.encoders.EncoderUtils.{externalDataTypeFor, 
isNativeEncoder, lenientExternalDataTypeFor}
 import org.apache.spark.sql.catalyst.expressions.{BoundReference, 
CheckOverflow, CreateNamedStruct, Expression, IsNull, KnownNotNull, Literal, 
UnsafeArrayData}
 import org.apache.spark.sql.catalyst.expressions.objects._
@@ -166,6 +166,28 @@ object SerializerBuildHelper {
       returnNullable = false)
   }
 
+  def createSerializerForLocalDateTimeNanos(
+      inputObject: Expression,
+      precision: Int): Expression = {
+    StaticInvoke(
+      DateTimeUtils.getClass,
+      TimestampNTZNanosType(precision),
+      "localDateTimeToTimestampNanos",
+      inputObject :: Literal(precision) :: Nil,
+      returnNullable = false)
+  }
+
+  def createSerializerForInstantNanos(
+      inputObject: Expression,
+      precision: Int): Expression = {
+    StaticInvoke(
+      DateTimeUtils.getClass,
+      TimestampLTZNanosType(precision),
+      "instantToTimestampNanos",
+      inputObject :: Literal(precision) :: Nil,
+      returnNullable = false)
+  }
+
   def createSerializerForJavaLocalDate(inputObject: Expression): Expression = {
     StaticInvoke(
       DateTimeUtils.getClass,
@@ -376,6 +398,8 @@ object SerializerBuildHelper {
     case TimestampEncoder(false) => createSerializerForSqlTimestamp(input)
     case InstantEncoder(false) => createSerializerForJavaInstant(input)
     case LocalDateTimeEncoder => createSerializerForLocalDateTime(input)
+    case LocalDateTimeNanosEncoder(p) => 
createSerializerForLocalDateTimeNanos(input, p)
+    case InstantNanosEncoder(p) => createSerializerForInstantNanos(input, p)
     case LocalTimeEncoder if !SQLConf.get.isTimeTypeEnabled =>
       throw 
org.apache.spark.sql.errors.QueryCompilationErrors.unsupportedTimeTypeError()
     case LocalTimeEncoder => createSerializerForLocalTime(input)
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/EncoderUtils.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/EncoderUtils.scala
index 0fce96c15997..f65203aed8c7 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/EncoderUtils.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/EncoderUtils.scala
@@ -25,8 +25,8 @@ import org.apache.spark.sql.catalyst.expressions.Expression
 import org.apache.spark.sql.catalyst.types.{PhysicalBinaryType, 
PhysicalIntegerType, PhysicalLongType}
 import org.apache.spark.sql.catalyst.types.ops.TypeOps
 import org.apache.spark.sql.catalyst.util.{ArrayData, MapData}
-import org.apache.spark.sql.types.{ArrayType, BinaryType, BooleanType, 
ByteType, CalendarIntervalType, DataType, DateType, DayTimeIntervalType, 
Decimal, DecimalType, DoubleType, FloatType, GeographyType, GeometryType, 
IntegerType, LongType, MapType, ObjectType, ShortType, StringType, StructType, 
TimestampNTZType, TimestampType, TimeType, UserDefinedType, VariantType, 
YearMonthIntervalType}
-import org.apache.spark.unsafe.types.{CalendarInterval, GeographyVal, 
GeometryVal, UTF8String, VariantVal}
+import org.apache.spark.sql.types.{ArrayType, BinaryType, BooleanType, 
ByteType, CalendarIntervalType, DataType, DateType, DayTimeIntervalType, 
Decimal, DecimalType, DoubleType, FloatType, GeographyType, GeometryType, 
IntegerType, LongType, MapType, ObjectType, ShortType, StringType, StructType, 
TimestampLTZNanosType, TimestampNTZNanosType, TimestampNTZType, TimestampType, 
TimeType, UserDefinedType, VariantType, YearMonthIntervalType}
+import org.apache.spark.unsafe.types.{CalendarInterval, GeographyVal, 
GeometryVal, TimestampNanosVal, UTF8String, VariantVal}
 
 /**
  * :: DeveloperApi ::
@@ -107,6 +107,8 @@ object EncoderUtils {
       case _: DayTimeIntervalType => classOf[PhysicalLongType.InternalType]
       case _: YearMonthIntervalType => 
classOf[PhysicalIntegerType.InternalType]
       case _: TimeType => classOf[PhysicalLongType.InternalType]
+      case _: TimestampNTZNanosType => classOf[TimestampNanosVal]
+      case _: TimestampLTZNanosType => classOf[TimestampNanosVal]
       case _: StringType => classOf[UTF8String]
       case _: StructType => classOf[InternalRow]
       case _: ArrayType => classOf[ArrayData]
@@ -126,6 +128,8 @@ object EncoderUtils {
     case BinaryType => classOf[Array[Byte]]
     case _: StringType => classOf[UTF8String]
     case CalendarIntervalType => classOf[CalendarInterval]
+    case _: TimestampNTZNanosType => classOf[TimestampNanosVal]
+    case _: TimestampLTZNanosType => classOf[TimestampNanosVal]
     case _: StructType => classOf[InternalRow]
     case _: ArrayType => classOf[ArrayData]
     case _: MapType => classOf[MapData]
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/CatalystTypeConvertersSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/CatalystTypeConvertersSuite.scala
index 222465d82c02..dbb9bc04914a 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/CatalystTypeConvertersSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/CatalystTypeConvertersSuite.scala
@@ -29,7 +29,7 @@ import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
 import org.apache.spark.sql.types.DayTimeIntervalType._
 import org.apache.spark.sql.types.YearMonthIntervalType._
-import org.apache.spark.unsafe.types.{GeographyVal, GeometryVal, UTF8String}
+import org.apache.spark.unsafe.types.{GeographyVal, GeometryVal, 
TimestampNanosVal, UTF8String}
 
 class CatalystTypeConvertersSuite extends SparkFunSuite with SQLHelper {
 
@@ -109,7 +109,7 @@ class CatalystTypeConvertersSuite extends SparkFunSuite 
with SQLHelper {
       exception = intercept[SparkIllegalArgumentException] {
         CatalystTypeConverters.createToCatalystConverter(structType)("test")
       },
-      condition = "_LEGACY_ERROR_TEMP_3219",
+      condition = "INVALID_EXTERNAL_VALUE",
       parameters = Map(
         "other" -> "test",
         "otherClass" -> "java.lang.String",
@@ -149,7 +149,7 @@ class CatalystTypeConvertersSuite extends SparkFunSuite 
with SQLHelper {
       exception = intercept[SparkIllegalArgumentException] {
         CatalystTypeConverters.createToCatalystConverter(decimalType)("test")
       },
-      condition = "_LEGACY_ERROR_TEMP_3219",
+      condition = "INVALID_EXTERNAL_VALUE",
       parameters = Map(
         "other" -> "test",
         "otherClass" -> "java.lang.String",
@@ -169,7 +169,7 @@ class CatalystTypeConvertersSuite extends SparkFunSuite 
with SQLHelper {
       exception = intercept[SparkIllegalArgumentException] {
         CatalystTypeConverters.createToCatalystConverter(StringType)(0.1)
       },
-      condition = "_LEGACY_ERROR_TEMP_3219",
+      condition = "INVALID_EXTERNAL_VALUE",
       parameters = Map(
         "other" -> "0.1",
         "otherClass" -> "java.lang.Double",
@@ -250,6 +250,244 @@ class CatalystTypeConvertersSuite extends SparkFunSuite 
with SQLHelper {
     }
   }
 
+  test("SPARK-57033: converting java.time.LocalDateTime to 
TimestampNTZNanosType") {
+    Seq(
+      "0001-01-01T00:00:00",
+      "1582-10-02T01:02:03.04",
+      "1582-12-31T23:59:59.999999999",
+      "1970-01-01T00:00:00.000000001",
+      "1972-12-31T23:59:59.123456789",
+      "2019-02-26T16:56:00.123456789",
+      "9999-12-31T23:59:59.999999999").foreach { text =>
+      val input = LocalDateTime.parse(text)
+      Seq(7, 8, 9).foreach { p =>
+        val dt = TimestampNTZNanosType(p)
+        val result = 
CatalystTypeConverters.createToCatalystConverter(dt)(input)
+        val expected = DateTimeUtils.localDateTimeToTimestampNanos(input, p)
+        assert(result === expected)
+      }
+    }
+  }
+
+  test("SPARK-57033: converting TimestampNTZNanosType to 
java.time.LocalDateTime") {
+    Seq(
+      "1582-10-02T01:02:03.04",
+      "1582-12-31T23:59:59.999999999",
+      "1970-01-01T00:00:00.000000001",
+      "1972-12-31T23:59:59.123456789",
+      "2019-02-26T16:56:00.123456789",
+      "9999-12-31T23:59:59.999999999").foreach { text =>
+      val ldt = LocalDateTime.parse(text)
+      val v = DateTimeUtils.localDateTimeToTimestampNanos(ldt, precision = 9)
+      Seq(7, 8, 9).foreach { p =>
+        val dt = TimestampNTZNanosType(p)
+        assert(CatalystTypeConverters.createToScalaConverter(dt)(v) === ldt)
+      }
+    }
+  }
+
+  test("SPARK-57033: converting java.time.Instant to TimestampLTZNanosType") {
+    Seq(
+      "0001-01-01T00:00:00Z",
+      "1582-10-02T01:02:03.04Z",
+      "1582-12-31T23:59:59.999999999Z",
+      "1970-01-01T00:00:00.000000001Z",
+      "1972-12-31T23:59:59.123456789Z",
+      "2019-02-26T16:56:00.123456789Z",
+      "9999-12-31T23:59:59.999999999Z").foreach { text =>
+      val input = Instant.parse(text)
+      Seq(7, 8, 9).foreach { p =>
+        val dt = TimestampLTZNanosType(p)
+        val result = 
CatalystTypeConverters.createToCatalystConverter(dt)(input)
+        val expected = DateTimeUtils.instantToTimestampNanos(input, p)
+        assert(result === expected)
+      }
+    }
+  }
+
+  test("SPARK-57033: converting TimestampLTZNanosType to java.time.Instant") {
+    Seq(
+      "1582-10-02T01:02:03.04Z",
+      "1582-12-31T23:59:59.999999999Z",
+      "1970-01-01T00:00:00.000000001Z",
+      "1972-12-31T23:59:59.123456789Z",
+      "2019-02-26T16:56:00.123456789Z",
+      "9999-12-31T23:59:59.999999999Z").foreach { text =>
+      val instant = Instant.parse(text)
+      val v = DateTimeUtils.instantToTimestampNanos(instant, precision = 9)
+      Seq(7, 8, 9).foreach { p =>
+        val dt = TimestampLTZNanosType(p)
+        assert(CatalystTypeConverters.createToScalaConverter(dt)(v) === 
instant)
+      }
+    }
+  }
+
+  test("SPARK-57033: TimestampLTZNanosType -> Instant ignores java8 API flag") 
{
+    val instant = Instant.parse("2019-02-26T16:56:00.123456789Z")
+    val v = DateTimeUtils.instantToTimestampNanos(instant, precision = 9)
+    val dt = TimestampLTZNanosType()
+    Seq("true", "false").foreach { flag =>
+      withSQLConf(SQLConf.DATETIME_JAVA8API_ENABLED.key -> flag) {
+        assert(CatalystTypeConverters.createToCatalystConverter(dt)(instant) 
=== v)
+        assert(CatalystTypeConverters.createToScalaConverter(dt)(v) === 
instant)
+      }
+    }
+  }
+
+  test("SPARK-57033: TimestampNanosConverter null handling in rows") {
+    val schema = StructType(
+      StructField("ntz", TimestampNTZNanosType(9), nullable = true) ::
+        StructField("ltz", TimestampLTZNanosType(9), nullable = true) :: Nil)
+    val toCat = CatalystTypeConverters.createToCatalystConverter(schema)
+    val toScala = CatalystTypeConverters.createToScalaConverter(schema)
+    // Reference value to ensure non-null cells are kept as-is.
+    val ldt = LocalDateTime.parse("2019-02-26T16:56:00.123456789")
+    val instant = Instant.parse("2019-02-26T16:56:00.987654321Z")
+    val row = Row(ldt, instant)
+    val catalystRow = toCat(row).asInstanceOf[InternalRow]
+    assert(catalystRow.getTimestampNTZNanos(0) ===
+      DateTimeUtils.localDateTimeToTimestampNanos(ldt, precision = 9))
+    assert(catalystRow.getTimestampLTZNanos(1) ===
+      DateTimeUtils.instantToTimestampNanos(instant, precision = 9))
+    assert(toScala(catalystRow) === row)
+    // Null row.
+    val nullRow = Row.fromSeq(Seq(null, null))
+    assert(toScala(toCat(nullRow)) === nullRow)
+  }
+
+  test("SPARK-57033: TimestampNTZNanosType converter truncates sub-micro to 
precision") {
+    val ldt = LocalDateTime.parse("2019-02-26T16:56:00.123456789")
+    val negativeEpochLdt = LocalDateTime.parse("1969-12-31T23:59:59.123456789")
+    val cases = Map(7 -> 700, 8 -> 780, 9 -> 789)
+    Seq(ldt, negativeEpochLdt).foreach { input =>
+      cases.foreach { case (p, expectedNanosWithinMicro) =>
+        val dt = TimestampNTZNanosType(p)
+        val v = CatalystTypeConverters.createToCatalystConverter(dt)(input)
+          .asInstanceOf[TimestampNanosVal]
+        assert(v.nanosWithinMicro === expectedNanosWithinMicro,
+          s"input=$input, precision=$p: expected $expectedNanosWithinMicro, " +
+            s"got ${v.nanosWithinMicro}")
+      }
+    }
+  }
+
+  test("SPARK-57033: TimestampLTZNanosType converter truncates sub-micro to 
precision") {
+    val instant = Instant.parse("2019-02-26T16:56:00.123456789Z")
+    val negativeEpochInstant = Instant.parse("1969-12-31T23:59:59.123456789Z")
+    val cases = Map(7 -> 700, 8 -> 780, 9 -> 789)
+    Seq(instant, negativeEpochInstant).foreach { input =>
+      cases.foreach { case (p, expectedNanosWithinMicro) =>
+        val dt = TimestampLTZNanosType(p)
+        val v = CatalystTypeConverters.createToCatalystConverter(dt)(input)
+          .asInstanceOf[TimestampNanosVal]
+        assert(v.nanosWithinMicro === expectedNanosWithinMicro,
+          s"input=$input, precision=$p: expected $expectedNanosWithinMicro, " +
+            s"got ${v.nanosWithinMicro}")
+      }
+    }
+  }
+
+  test("SPARK-57033: TimestampNTZNanosType converter rejects wrong external 
type") {
+    val dt = TimestampNTZNanosType(9)
+    checkError(
+      exception = intercept[SparkIllegalArgumentException] {
+        
CatalystTypeConverters.createToCatalystConverter(dt)("not-a-LocalDateTime")
+      },
+      condition = "INVALID_EXTERNAL_VALUE",
+      parameters = Map(
+        "other" -> "not-a-LocalDateTime",
+        "otherClass" -> "java.lang.String",
+        "dataType" -> "TIMESTAMP_NTZ(9)"))
+    // An `Instant` is also not accepted - the NTZ nano converter is 
wall-clock only.
+    val instant = Instant.parse("2019-02-26T16:56:00.123456789Z")
+    checkError(
+      exception = intercept[SparkIllegalArgumentException] {
+        CatalystTypeConverters.createToCatalystConverter(dt)(instant)
+      },
+      condition = "INVALID_EXTERNAL_VALUE",
+      parameters = Map(
+        "other" -> instant.toString,
+        "otherClass" -> "java.time.Instant",
+        "dataType" -> "TIMESTAMP_NTZ(9)"))
+  }
+
+  test("SPARK-57033: TimestampLTZNanosType converter rejects wrong external 
type") {
+    val dt = TimestampLTZNanosType(9)
+    checkError(
+      exception = intercept[SparkIllegalArgumentException] {
+        CatalystTypeConverters.createToCatalystConverter(dt)("not-an-Instant")
+      },
+      condition = "INVALID_EXTERNAL_VALUE",
+      parameters = Map(
+        "other" -> "not-an-Instant",
+        "otherClass" -> "java.lang.String",
+        "dataType" -> "TIMESTAMP_LTZ(9)"))
+    // A `LocalDateTime` is also not accepted - LTZ requires an absolute 
instant.
+    val ldt = LocalDateTime.parse("2019-02-26T16:56:00.123456789")
+    checkError(
+      exception = intercept[SparkIllegalArgumentException] {
+        CatalystTypeConverters.createToCatalystConverter(dt)(ldt)
+      },
+      condition = "INVALID_EXTERNAL_VALUE",
+      parameters = Map(
+        "other" -> ldt.toString,
+        "otherClass" -> "java.time.LocalDateTime",
+        "dataType" -> "TIMESTAMP_LTZ(9)"))
+    // The legacy `java.sql.Timestamp` external type is intentionally out of 
scope for
+    // `TimestampLTZNanosType` (see SPARK-57033). Verify it is rejected, not 
silently
+    // accepted via a fallback.
+    val ts = 
java.sql.Timestamp.from(Instant.parse("2019-02-26T16:56:00.123456789Z"))
+    checkError(
+      exception = intercept[SparkIllegalArgumentException] {
+        CatalystTypeConverters.createToCatalystConverter(dt)(ts)
+      },
+      condition = "INVALID_EXTERNAL_VALUE",
+      parameters = Map(
+        "other" -> ts.toString,
+        "otherClass" -> "java.sql.Timestamp",
+        "dataType" -> "TIMESTAMP_LTZ(9)"))
+  }
+
+  test("SPARK-57033: nested nanos timestamp types in arrays / maps / structs") 
{
+    val ldt = LocalDateTime.parse("2019-02-26T16:56:00.123456789")
+    val instant = Instant.parse("2019-02-26T16:56:00.987654321Z")
+    val ldtNano = DateTimeUtils.localDateTimeToTimestampNanos(ldt, precision = 
9)
+    val instantNano = DateTimeUtils.instantToTimestampNanos(instant, precision 
= 9)
+
+    // Array of TimestampNTZNanosType.
+    val ntzArrayType = ArrayType(TimestampNTZNanosType(9), containsNull = true)
+    val ldts = Seq(ldt, null, ldt)
+    val catArr = 
CatalystTypeConverters.createToCatalystConverter(ntzArrayType)(ldts)
+      .asInstanceOf[GenericArrayData]
+    assert(catArr.numElements() === 3)
+    assert(catArr.get(0, TimestampNTZNanosType(9)) === ldtNano)
+    assert(catArr.isNullAt(1))
+    assert(catArr.get(2, TimestampNTZNanosType(9)) === ldtNano)
+    assert(CatalystTypeConverters.createToScalaConverter(ntzArrayType)(catArr) 
=== ldts)
+
+    // Map of (String -> TimestampLTZNanosType).
+    val ltzMapType = MapType(StringType, TimestampLTZNanosType(9), 
valueContainsNull = true)
+    val instants = Map[String, Instant]("a" -> instant, "b" -> null)
+    val catMap = 
CatalystTypeConverters.createToCatalystConverter(ltzMapType)(instants)
+    val backMap = 
CatalystTypeConverters.createToScalaConverter(ltzMapType)(catMap)
+      .asInstanceOf[Map[Any, Any]]
+    assert(backMap("a") === instant)
+    assert(backMap("b") == null)
+
+    // Struct with both nano fields plus a non-nano field, and a null row.
+    val nestedStruct = StructType(
+      StructField("ntz", TimestampNTZNanosType(9), nullable = true) ::
+        StructField("ltz", TimestampLTZNanosType(9), nullable = true) ::
+        StructField("name", StringType, nullable = true) :: Nil)
+    val outerStruct = StructType(StructField("inner", nestedStruct, nullable = 
true) :: Nil)
+    val outerToCat = 
CatalystTypeConverters.createToCatalystConverter(outerStruct)
+    val outerToScala = 
CatalystTypeConverters.createToScalaConverter(outerStruct)
+    val outerRow = Row(Row(ldt, instant, "abc"))
+    val nullOuterRow = Row(null)
+    assert(outerToScala(outerToCat(outerRow)) === outerRow)
+    assert(outerToScala(outerToCat(nullOuterRow)) === nullOuterRow)
+  }
+
   test("converting java.time.LocalDate to DateType") {
     Seq(
       "0101-02-16",
@@ -579,7 +817,7 @@ class CatalystTypeConvertersSuite extends SparkFunSuite 
with SQLHelper {
       exception = intercept[SparkIllegalArgumentException] {
         CatalystTypeConverters.createToCatalystConverter(gt)("test")
       },
-      condition = "_LEGACY_ERROR_TEMP_3219",
+      condition = "INVALID_EXTERNAL_VALUE",
       parameters = Map(
         "other" -> "test",
         "otherClass" -> "java.lang.String",
@@ -592,7 +830,7 @@ class CatalystTypeConvertersSuite extends SparkFunSuite 
with SQLHelper {
       exception = intercept[SparkIllegalArgumentException] {
         CatalystTypeConverters.createToCatalystConverter(gt)("test")
       },
-      condition = "_LEGACY_ERROR_TEMP_3219",
+      condition = "INVALID_EXTERNAL_VALUE",
       parameters = Map(
         "other" -> "test",
         "otherClass" -> "java.lang.String",
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/RowEncoderSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/RowEncoderSuite.scala
index 09247a459b9c..69f4995220fe 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/RowEncoderSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/RowEncoderSuite.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst.encoders
 import scala.collection.mutable
 import scala.util.Random
 
-import org.apache.spark.SparkRuntimeException
+import org.apache.spark.{SparkException, SparkRuntimeException}
 import org.apache.spark.sql.{RandomDataGenerator, Row}
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.plans.CodegenInterpretedPlanTest
@@ -381,6 +381,76 @@ class RowEncoderSuite extends CodegenInterpretedPlanTest {
     assert(readback.get(0) === localDateTime)
   }
 
+  test("SPARK-57033: encoding/decoding TimestampNTZNanosType to/from 
java.time.LocalDateTime") {
+    val inputs = Seq(
+      java.time.LocalDateTime.parse("2019-02-26T16:56:00.123456789"),
+      java.time.LocalDateTime.parse("1969-12-31T23:59:59.123456789"))
+    for (localDateTime <- inputs) {
+      for (p <- TimestampNTZNanosType.MIN_PRECISION to 
TimestampNTZNanosType.MAX_PRECISION) {
+        val schema = new StructType().add("t", TimestampNTZNanosType(p))
+        val encoder = ExpressionEncoder(schema).resolveAndBind()
+        val row = toRow(encoder, Row(localDateTime))
+        val expected = 
DateTimeUtils.localDateTimeToTimestampNanos(localDateTime, p)
+        assert(row.getTimestampNTZNanos(0) === expected)
+        val readback = fromRow(encoder, row)
+        assert(readback.get(0) === 
DateTimeUtils.timestampNanosToLocalDateTime(expected))
+      }
+    }
+  }
+
+  test("SPARK-57033: encoding/decoding TimestampLTZNanosType to/from 
java.time.Instant") {
+    val inputs = Seq(
+      java.time.Instant.parse("2019-02-26T16:56:00.123456789Z"),
+      java.time.Instant.parse("1969-12-31T23:59:59.123456789Z"))
+    for (instant <- inputs) {
+      for (p <- TimestampLTZNanosType.MIN_PRECISION to 
TimestampLTZNanosType.MAX_PRECISION) {
+        val schema = new StructType().add("t", TimestampLTZNanosType(p))
+        val encoder = ExpressionEncoder(schema).resolveAndBind()
+        val row = toRow(encoder, Row(instant))
+        val expected = DateTimeUtils.instantToTimestampNanos(instant, p)
+        assert(row.getTimestampLTZNanos(0) === expected)
+        val readback = fromRow(encoder, row)
+        assert(readback.get(0) === 
DateTimeUtils.timestampNanosToInstant(expected))
+      }
+    }
+  }
+
+  test("SPARK-57033: encoding/decoding TimestampLTZNanosType ignores java8 API 
flag") {
+    val instant = java.time.Instant.parse("2019-02-26T16:56:00.123456789Z")
+    val schema = new StructType().add("t", TimestampLTZNanosType())
+    Seq("true", "false").foreach { flag =>
+      withSQLConf(SQLConf.DATETIME_JAVA8API_ENABLED.key -> flag) {
+        val encoder = ExpressionEncoder(schema).resolveAndBind()
+        val row = toRow(encoder, Row(instant))
+        assert(row.getTimestampLTZNanos(0) ===
+          DateTimeUtils.instantToTimestampNanos(instant, precision = 9))
+        val readback = fromRow(encoder, row)
+        assert(readback.get(0) === instant)
+      }
+    }
+  }
+
+  test("SPARK-57033: RowEncoder rejects nanos timestamp types when feature 
flag is off") {
+    Seq(
+      new StructType().add("t", TimestampNTZNanosType()),
+      new StructType().add("t", TimestampLTZNanosType())
+    ).foreach { schema =>
+      withSQLConf(SQLConf.TIMESTAMP_NANOS_TYPES_ENABLED.key -> "false") {
+        checkError(
+          exception = intercept[SparkException] {
+            ExpressionEncoder(schema)
+          },
+          condition = "FEATURE_NOT_ENABLED",
+          parameters = Map(
+            "featureName" -> "Nanosecond-precision timestamp types",
+            "configKey" -> "spark.sql.timestampNanosTypes.enabled",
+            "configValue" -> "true"
+          )
+        )
+      }
+    }
+  }
+
   test("encoding/decoding DateType to/from java.time.LocalDate") {
     withSQLConf(SQLConf.DATETIME_JAVA8API_ENABLED.key -> "true") {
       val schema = new StructType().add("d", DateType)
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala
index 4810ec69bb96..47eb4a1e3e3c 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala
@@ -36,7 +36,7 @@ import org.apache.spark.sql.errors.DataTypeErrors.toSQLConf
 import org.apache.spark.sql.internal.SqlApiConf
 import org.apache.spark.sql.types.DayTimeIntervalType.{HOUR, SECOND}
 import org.apache.spark.sql.types.Decimal
-import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String}
+import org.apache.spark.unsafe.types.{CalendarInterval, TimestampNanosVal, 
UTF8String}
 
 class DateTimeUtilsSuite extends SparkFunSuite with Matchers with SQLHelper {
 
@@ -1851,4 +1851,150 @@ class DateTimeUtilsSuite extends SparkFunSuite with 
Matchers with SQLHelper {
       timeBucketYMInterval(1, Long.MinValue, 0L, utc)
     }
   }
+
+  test("SPARK-57033: java.time.LocalDateTime <-> TimestampNanosVal roundtrip") 
{
+    val cases = Seq(
+      LocalDateTime.parse("0001-01-01T00:00:00"),
+      LocalDateTime.parse("1582-10-02T01:02:03.04"),
+      LocalDateTime.parse("1582-12-31T23:59:59.999999999"),
+      LocalDateTime.parse("1969-12-31T23:59:59.999999999"),
+      LocalDateTime.parse("1970-01-01T00:00:00"),
+      LocalDateTime.parse("1970-01-01T00:00:00.000000001"),
+      LocalDateTime.parse("1970-01-01T00:00:00.123456789"),
+      LocalDateTime.parse("2019-02-26T16:56:00.123456789"),
+      LocalDateTime.parse("9999-12-31T23:59:59.999999999"))
+    for (ldt <- cases) {
+      val v = localDateTimeToTimestampNanos(ldt, precision = 9)
+      assert(v.nanosWithinMicro >= 0 && v.nanosWithinMicro <= 999,
+        s"nanosWithinMicro out of range for $ldt: $v")
+      assert(v.nanosWithinMicro === (ldt.getNano % 1000).toShort)
+      assert(v.epochMicros === DateTimeUtils.localDateTimeToMicros(ldt))
+      assert(timestampNanosToLocalDateTime(v) === ldt)
+    }
+  }
+
+  test("SPARK-57033: java.time.Instant <-> TimestampNanosVal roundtrip") {
+    val cases = Seq(
+      Instant.EPOCH,
+      Instant.parse("0001-01-01T00:00:00Z"),
+      Instant.parse("1582-10-02T01:02:03.04Z"),
+      Instant.parse("1582-12-31T23:59:59.999999999Z"),
+      Instant.parse("1969-12-31T23:59:59.999999999Z"),
+      Instant.parse("1970-01-01T00:00:00.000000001Z"),
+      Instant.parse("2019-02-26T16:56:00.123456789Z"),
+      Instant.parse("9999-12-31T23:59:59.999999999Z"))
+    for (i <- cases) {
+      val v = instantToTimestampNanos(i, precision = 9)
+      assert(v.nanosWithinMicro >= 0 && v.nanosWithinMicro <= 999,
+        s"nanosWithinMicro out of range for $i: $v")
+      assert(v.nanosWithinMicro === (i.getNano % 1000).toShort)
+      assert(v.epochMicros === instantToMicros(i))
+      assert(timestampNanosToInstant(v) === i)
+    }
+  }
+
+  test("SPARK-57033: TimestampNanosVal random roundtrip") {
+    val rnd = new scala.util.Random(0)
+    // Random instants across the valid range with every possible sub-micro 
digit.
+    val min = Instant.parse("0001-01-01T00:00:00Z").getEpochSecond
+    val max = Instant.parse("9999-12-31T23:59:59.999999999Z").getEpochSecond
+    for (_ <- 0 until 200) {
+      val secs = min + math.abs(rnd.nextLong()) % (max - min + 1)
+      val nano = rnd.nextInt(1_000_000_000)
+      val i = Instant.ofEpochSecond(secs, nano.toLong)
+      val v = instantToTimestampNanos(i, precision = 9)
+      assert(timestampNanosToInstant(v) === i)
+      val ldt = LocalDateTime.ofInstant(i, ZoneOffset.UTC)
+      val v2 = localDateTimeToTimestampNanos(ldt, precision = 9)
+      assert(timestampNanosToLocalDateTime(v2) === ldt)
+      // Internal layout matches direct decomposition.
+      assert(v === TimestampNanosVal.fromParts(instantToMicros(i), (nano % 
1000).toShort))
+    }
+  }
+
+  test("SPARK-57033: localDateTimeToTimestampNanos truncates sub-micro to 
precision") {
+    val ldt = LocalDateTime.parse("2019-02-26T16:56:00.123456789")
+    // precision 9 keeps all 3 sub-micro digits, 8 drops the last, 7 drops the 
last two.
+    assert(localDateTimeToTimestampNanos(ldt, precision = 9).nanosWithinMicro 
=== 789)
+    assert(localDateTimeToTimestampNanos(ldt, precision = 8).nanosWithinMicro 
=== 780)
+    assert(localDateTimeToTimestampNanos(ldt, precision = 7).nanosWithinMicro 
=== 700)
+    // epochMicros is unaffected by sub-micro truncation.
+    val expectedMicros = DateTimeUtils.localDateTimeToMicros(ldt)
+    for (p <- 7 to 9) {
+      assert(localDateTimeToTimestampNanos(ldt, precision = p).epochMicros === 
expectedMicros)
+    }
+
+    val negativeEpochLdt = LocalDateTime.parse("1969-12-31T23:59:59.123456789")
+    assert(localDateTimeToTimestampNanos(negativeEpochLdt, precision = 
9).nanosWithinMicro === 789)
+    assert(localDateTimeToTimestampNanos(negativeEpochLdt, precision = 
8).nanosWithinMicro === 780)
+    assert(localDateTimeToTimestampNanos(negativeEpochLdt, precision = 
7).nanosWithinMicro === 700)
+    val negativeExpectedMicros = 
DateTimeUtils.localDateTimeToMicros(negativeEpochLdt)
+    for (p <- 7 to 9) {
+      assert(localDateTimeToTimestampNanos(negativeEpochLdt, precision = 
p).epochMicros ===
+        negativeExpectedMicros)
+    }
+  }
+
+  test("SPARK-57033: instantToTimestampNanos truncates sub-micro to 
precision") {
+    val i = Instant.parse("2019-02-26T16:56:00.123456789Z")
+    assert(instantToTimestampNanos(i, precision = 9).nanosWithinMicro === 789)
+    assert(instantToTimestampNanos(i, precision = 8).nanosWithinMicro === 780)
+    assert(instantToTimestampNanos(i, precision = 7).nanosWithinMicro === 700)
+    val expectedMicros = instantToMicros(i)
+    for (p <- 7 to 9) {
+      assert(instantToTimestampNanos(i, precision = p).epochMicros === 
expectedMicros)
+    }
+
+    val negativeEpochInstant = Instant.parse("1969-12-31T23:59:59.123456789Z")
+    assert(instantToTimestampNanos(negativeEpochInstant, precision = 
9).nanosWithinMicro === 789)
+    assert(instantToTimestampNanos(negativeEpochInstant, precision = 
8).nanosWithinMicro === 780)
+    assert(instantToTimestampNanos(negativeEpochInstant, precision = 
7).nanosWithinMicro === 700)
+    val negativeExpectedMicros = instantToMicros(negativeEpochInstant)
+    for (p <- 7 to 9) {
+      assert(instantToTimestampNanos(negativeEpochInstant, precision = 
p).epochMicros ===
+        negativeExpectedMicros)
+    }
+  }
+
+  test("SPARK-57033: random roundtrip across precisions floors to the 
precision step") {
+    val rnd = new scala.util.Random(0)
+    val min = Instant.parse("0001-01-01T00:00:00Z").getEpochSecond
+    val max = Instant.parse("9999-12-31T23:59:59.999999999Z").getEpochSecond
+    // For each random instant, verify both helpers floor the original 
nanosecond value
+    // (toward `-inf`) to the precision step `10^(9 - p)` ns. The 
whole-instant nanosecond
+    // count overflows `Long` for far-future dates, so we check the floor on 
the components
+    // instead: `epochMicros` is invariant across precisions (matches 
`instantToMicros`) and
+    // the sub-micro nanosecond residual is floored to the precision step.
+    for (_ <- 0 until 10) {
+      val secs = min + math.abs(rnd.nextLong()) % (max - min + 1)
+      val nano = rnd.nextInt(1_000_000_000)
+      val instant = Instant.ofEpochSecond(secs, nano.toLong)
+      val ldt = LocalDateTime.ofInstant(instant, ZoneOffset.UTC)
+      val expectedMicros = instantToMicros(instant)
+      val rawSubMicro = (nano % 1000).toShort
+      for (p <- 7 to 9) {
+        val step = math.pow(10, 9 - p).toLong.toShort
+        val expectedSubMicro = ((rawSubMicro / step) * step).toShort
+
+        val ltz = instantToTimestampNanos(instant, p)
+        assert(ltz.epochMicros === expectedMicros,
+          s"LTZ p=$p instant=$instant epochMicros=${ltz.epochMicros}")
+        assert(ltz.nanosWithinMicro === expectedSubMicro,
+          s"LTZ p=$p instant=$instant 
nanosWithinMicro=${ltz.nanosWithinMicro}")
+        // Roundtrip preserves the truncated value.
+        val ltzBack = timestampNanosToInstant(ltz)
+        assert(instantToMicros(ltzBack) === expectedMicros)
+        assert(ltzBack.getNano % 1000 === expectedSubMicro)
+
+        val ntz = localDateTimeToTimestampNanos(ldt, p)
+        assert(ntz.epochMicros === expectedMicros,
+          s"NTZ p=$p ldt=$ldt epochMicros=${ntz.epochMicros}")
+        assert(ntz.nanosWithinMicro === expectedSubMicro,
+          s"NTZ p=$p ldt=$ldt nanosWithinMicro=${ntz.nanosWithinMicro}")
+        val ntzBack = timestampNanosToLocalDateTime(ntz)
+        assert(DateTimeUtils.localDateTimeToMicros(ntzBack) === expectedMicros)
+        assert(ntzBack.getNano % 1000 === expectedSubMicro)
+      }
+    }
+  }
 }
diff --git 
a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java 
b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java
index 62d44e0af8b0..416c92602b83 100644
--- a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java
+++ b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java
@@ -789,6 +789,24 @@ public class JavaDatasetSuite implements Serializable {
     Assertions.assertEquals(data, ds.collectAsList());
   }
 
+  @Test
+  public void testTimestampNanosRowEncoder() {
+    final StructType schema = new StructType()
+        .add("ntz", org.apache.spark.sql.types.TimestampNTZNanosType.apply())
+        .add("ltz", org.apache.spark.sql.types.TimestampLTZNanosType.apply());
+    LocalDateTime ldt = LocalDateTime.parse("2019-02-26T16:56:00.123456789");
+    Instant instant = Instant.parse("2019-02-26T16:56:00.987654321Z");
+    List<Row> rows = Arrays.asList(create(ldt, instant), create(null, null));
+    Dataset<Row> ds = spark.createDataset(rows, Encoders.row(schema));
+    Assertions.assertEquals(schema, ds.schema());
+    List<Row> collected = ds.collectAsList();
+    Assertions.assertEquals(2, collected.size());
+    Assertions.assertEquals(ldt, collected.get(0).get(0));
+    Assertions.assertEquals(instant, collected.get(0).get(1));
+    Assertions.assertTrue(collected.get(1).isNullAt(0));
+    Assertions.assertTrue(collected.get(1).isNullAt(1));
+  }
+
   @Test
   public void testDurationEncoder() {
     Encoder<Duration> encoder = Encoders.DURATION();
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
index 879569045b6d..dc930af87490 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql
 
 import java.io.{Externalizable, ObjectInput, ObjectOutput}
 import java.sql.{Date, Timestamp}
+import java.time.{Instant, LocalDateTime}
 
 import scala.collection.immutable.HashSet
 import scala.collection.mutable
@@ -2631,6 +2632,49 @@ class DatasetSuite extends SharedSparkSession
     assert(Seq(localDateTime).toDS().head() === localDateTime)
   }
 
+  test("SPARK-57033: Dataset[Row] roundtrip preserves nanosecond precision") {
+    val schema = new StructType()
+      .add("ntz", TimestampNTZNanosType(9), nullable = true)
+      .add("ltz", TimestampLTZNanosType(9), nullable = true)
+    val rows = Seq(
+      Row(
+        LocalDateTime.parse("2019-02-26T16:56:00.123456789"),
+        Instant.parse("2019-02-26T16:56:00.987654321Z")),
+      Row(
+        LocalDateTime.parse("9999-12-31T23:59:59.999999999"),
+        Instant.parse("9999-12-31T23:59:59.999999999Z")),
+      Row(null, null))
+    val df = spark.createDataFrame(rows.asJava, schema)
+    assert(df.schema === schema)
+    checkAnswer(df, rows)
+  }
+
+  test("SPARK-57033: Dataset[Row] roundtrip truncates sub-micro to declared 
precision") {
+    val ldt = LocalDateTime.parse("2019-02-26T16:56:00.123456789")
+    val instant = Instant.parse("2019-02-26T16:56:00.123456789Z")
+    val negativeEpochLdt = LocalDateTime.parse("1969-12-31T23:59:59.123456789")
+    val negativeEpochInstant = Instant.parse("1969-12-31T23:59:59.123456789Z")
+    // At p=7 the last two sub-micro digits are dropped (789 -> 700);
+    // at p=8 only the last one is dropped (789 -> 780).
+    val expectedSubMicro = Map(7 -> 700, 8 -> 780)
+
+    for (p <- 7 to 8) {
+      val schema = new StructType()
+        .add("ntz", TimestampNTZNanosType(p), nullable = true)
+        .add("ltz", TimestampLTZNanosType(p), nullable = true)
+      val rows = Seq(
+        Row(ldt, instant),
+        Row(negativeEpochLdt, negativeEpochInstant))
+      val df = spark.createDataFrame(rows.asJava, schema)
+      assert(df.schema === schema)
+      val drop = 789 - expectedSubMicro(p)
+      val expected = Seq(
+        Row(ldt.minusNanos(drop), instant.minusNanos(drop)),
+        Row(negativeEpochLdt.minusNanos(drop), 
negativeEpochInstant.minusNanos(drop)))
+      checkAnswer(df, expected)
+    }
+  }
+
   test("SPARK-34605: implicit encoder for java.time.Duration") {
     val duration = java.time.Duration.ofMinutes(10)
     assert(spark.range(1).map { _ => duration }.head() === duration)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to