This is an automated email from the ASF dual-hosted git repository.
MaxGekk pushed a commit to branch branch-4.x
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-4.x by this push:
new 5795ed7beda9 [SPARK-56981][SQL] Add physical representation and
UnsafeRow support for nanosecond timestamps
5795ed7beda9 is described below
commit 5795ed7beda971c9947d96b2e92d8bde5231353f
Author: Maxim Gekk <[email protected]>
AuthorDate: Wed May 27 14:12:33 2026 +0200
[SPARK-56981][SQL] Add physical representation and UnsafeRow support for
nanosecond timestamps
### What changes were proposed in this pull request?
This PR implements the **physical row layer** for nanosecond-capable
timestamp types
([SPARK-56981](https://issues.apache.org/jira/browse/SPARK-56981)), part of
[SPARK-56822](https://issues.apache.org/jira/browse/SPARK-56822) (SPIP:
Timestamps with nanosecond precision).
Logical types `TimestampNTZNanosType(p)` and `TimestampLTZNanosType(p)`
(`p` in \[7, 9\]) landed in #55952 but still mapped to
`UninitializedPhysicalType`, so values could not be stored or read from
`InternalRow` / `UnsafeRow`. This change wires the minimum physical
infrastructure downstream work (casts, Parquet, expressions, analysis gating)
can build on.
#### SPIP internal representation
Per the SPIP, a value is a composite of:
- **Epoch microseconds** (`long`, 8 bytes) — same proleptic-Gregorian epoch
microsecond count as existing microsecond timestamps
- **Nanoseconds within that microsecond** (`short`, 0–999) — sub-micro
fractional part, not a full nanosecond offset from epoch
Logical `defaultSize` remains **10 bytes** on the types. In `UnsafeRow`,
values use the same variable-length pattern as `CalendarInterval`: an 8-byte
field slot (`offset << 32 | size`) pointing to a **16-byte** aligned payload
(two 8-byte words: `epochMicros`, then `nanosWithinMicro` in the low 16 bits
with zero padding), so in-place updates remain possible.
#### Implementation summary
| Layer | Change |
|-------|--------|
| **Physical value** | Single shared container
[`TimestampNanosVal`](common/unsafe/src/main/java/org/apache/spark/unsafe/types/TimestampNanosVal.java)
for both NTZ and LTZ (geometry-style: one byte layout, logical type carries
SQL semantics). `fromParts` validates `nanosWithinMicro`; `fromTrustedRowBytes`
skips validation on the hot UnsafeRow read path (same convention as
`CalendarInterval`, `VariantVal`, etc.). `TimestampNanosVal.ZERO` for analysis
defaults. |
| **UnsafeRow payload** |
[`TimestampNanosRowValues`](sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/TimestampNanosRowValues.java)
— shared 16-byte read/write for `UnsafeRow` and `UnsafeArrayData`. |
| **Physical types** | `PhysicalTimestampNTZNanosType` and
`PhysicalTimestampLTZNanosType` registered in `PhysicalDataType.applyDefault`;
both use `TimestampNanosVal` as `InternalType`. Separate physical types keep
NTZ vs LTZ visible to the type system even though storage is identical. |
| **Row access** | Specialized getters/setters on `InternalRow`,
`UnsafeRow`, `UnsafeArrayData`, `GenericArrayData`, `JoinedRow`,
`ProjectingInternalRow`; codegen (`CodeGenerator`, `GenerateUnsafeProjection`,
`UnsafeWriter`, `UnsafeArrayWriter`, `InterpretedUnsafeProjection`,
`SpecializedGettersReader`). NTZ/LTZ accessor pairs are byte-identical by
design. |
| **Literals** | `Literal.validateLiteralValue`, `Literal.default`, and
`componentTypeToDataType` for `TimestampNanosVal`;
`GenerateUnsafeProjection.canSupport` cases for both nanos datetime types. |
| **Columnar** | `ColumnVector` / columnar row stubs throw
`SparkUnsupportedOperationException` until columnar support is added. |
| **Docs** | Scaladoc on logical types clarifies 10-byte logical size vs
16-byte UnsafeRow payload; physical-type docs note deferred ordering. |
#### Explicitly out of scope (follow-ups)
- **Types Framework** integration (`TypeOps`, etc.) — intentionally
deferred to keep this PR focused; can land in parallel once these row APIs
exist.
- **Physical ordering / compare / hash** on `PhysicalTimestamp*NanosType` —
throws `orderedOperationUnsupportedByDataTypeError` for now.
- **SQL exposure**, preview `SQLConf`, casts, Parquet, and expression
parity — separate sub-tasks under SPARK-56822.
### Why are the changes needed?
Without a concrete physical type and `UnsafeRow` accessors, any code path
that materializes rows for nanosecond timestamps fails or falls through to
`UninitializedPhysicalType`. This unblocks SPARK-57032 (string parsing),
SPARK-57033 (`java.time` / Dataset roundtrip), and later storage and expression
work.
### Does this PR introduce _any_ user-facing change?
No. Logical types exist but are not yet exposed through SQL; behavior of
`TimestampType`, `TimestampNTZType`, and microsecond storage is unchanged.
### How was this patch tested?
Unit tests added/extended:
- `build/sbt 'unsafe/testOnly *TimestampNanosSuite'` — `TimestampNanosVal`
equality/`hashCode`, validation, boundary `nanosWithinMicro`,
`fromTrustedRowBytes` trust contract, constants (`ZERO`,
`MAX_NANOS_WITHIN_MICRO`).
- `build/sbt 'catalyst/testOnly *TimestampNanosRowValuesSuite'` —
byte-level layout of `TimestampNanosRowValues` (word order, padding, extreme
`epochMicros`, cursor offsets).
- `build/sbt 'catalyst/testOnly *TimestampNanosRowSuite'` —
`GenericInternalRow` roundtrip; codegen + interpreted `UnsafeRow` roundtrip
(NTZ/LTZ, null/non-null, in-place update, null-at-creation / SPARK-41535-style
regression); `UnsafeArrayWriter` for nullable/non-nullable NTZ and LTZ arrays;
literal validation and `checkEvaluation`; extreme `Long.MinValue` /
`Long.MaxValue` epoch micros.
- `build/sbt 'catalyst/testOnly org.apache.spark.sql.types.DataTypeSuite'`
— `PhysicalDataType` is not `UninitializedPhysicalType` for `p` in {7, 8, 9};
`defaultSize` remains 10.
- `LiteralExpressionSuite` — `Literal.default` for nanos timestamp types.
### Was this patch authored or co-authored using generative AI tooling?
Generated-by: Cursor Auto
Closes #56059 from MaxGekk/nanos-in-rows.
Authored-by: Maxim Gekk <[email protected]>
Signed-off-by: Max Gekk <[email protected]>
(cherry picked from commit 683fd83ae77a58b9ae7a22b78dd53bcad18ad820)
Signed-off-by: Max Gekk <[email protected]>
---
.../spark/unsafe/types/TimestampNanosVal.java | 122 +++++++++++++
.../spark/unsafe/types/TimestampNanosSuite.java | 78 ++++++++
.../spark/sql/types/TimestampLTZNanosType.scala | 4 +
.../spark/sql/types/TimestampNTZNanosType.scala | 4 +
.../catalyst/expressions/SpecializedGetters.java | 7 +
.../expressions/SpecializedGettersReader.java | 6 +
.../expressions/TimestampNanosRowValues.java | 93 ++++++++++
.../sql/catalyst/expressions/UnsafeArrayData.java | 17 ++
.../spark/sql/catalyst/expressions/UnsafeRow.java | 67 ++++++-
.../expressions/codegen/UnsafeArrayWriter.java | 11 ++
.../catalyst/expressions/codegen/UnsafeWriter.java | 15 ++
.../apache/spark/sql/vectorized/ColumnVector.java | 10 +
.../apache/spark/sql/vectorized/ColumnarArray.java | 11 ++
.../spark/sql/vectorized/ColumnarBatchRow.java | 11 ++
.../apache/spark/sql/vectorized/ColumnarRow.java | 11 ++
.../apache/spark/sql/catalyst/InternalRow.scala | 22 ++-
.../spark/sql/catalyst/ProjectingInternalRow.scala | 8 +
.../expressions/InterpretedUnsafeProjection.scala | 10 +
.../spark/sql/catalyst/expressions/JoinedRow.scala | 8 +
.../expressions/codegen/CodeGenerator.scala | 9 +
.../codegen/GenerateUnsafeProjection.scala | 14 +-
.../spark/sql/catalyst/expressions/literals.scala | 5 +
.../spark/sql/catalyst/expressions/rows.scala | 2 +
.../sql/catalyst/types/PhysicalDataType.scala | 46 ++++-
.../spark/sql/catalyst/util/GenericArrayData.scala | 2 +
.../spark/sql/catalyst/util/UnsafeRowUtils.scala | 1 +
.../expressions/LiteralExpressionSuite.scala | 6 +-
.../expressions/TimestampNanosRowSuite.scala | 202 +++++++++++++++++++++
.../expressions/TimestampNanosRowValuesSuite.scala | 94 ++++++++++
.../codegen/GenerateUnsafeProjectionSuite.scala | 4 +
.../org/apache/spark/sql/types/DataTypeSuite.scala | 9 +
.../execution/vectorized/MutableColumnarRow.java | 11 ++
32 files changed, 914 insertions(+), 6 deletions(-)
diff --git
a/common/unsafe/src/main/java/org/apache/spark/unsafe/types/TimestampNanosVal.java
b/common/unsafe/src/main/java/org/apache/spark/unsafe/types/TimestampNanosVal.java
new file mode 100644
index 000000000000..973547f1d9f3
--- /dev/null
+++
b/common/unsafe/src/main/java/org/apache/spark/unsafe/types/TimestampNanosVal.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.unsafe.types;
+
+import org.apache.spark.SparkIllegalArgumentException;
+import org.apache.spark.annotation.Unstable;
+
+import java.io.Serializable;
+import java.util.Map;
+
+/**
+ * Physical representation for nanosecond-capable timestamp types ({@code
TIMESTAMP_NTZ(p)} and
+ * {@code TIMESTAMP_LTZ(p)} with {@code p} in [7, 9]).
+ *
+ * <p>Values are stored as two components:
+ * <ul>
+ * <li>{@link #epochMicros} - microseconds since the Unix epoch (same unit
as microsecond
+ * timestamp types),</li>
+ * <li>{@link #nanosWithinMicro} - additional nanoseconds within that
microsecond, in [0, 999].
+ * </li>
+ * </ul>
+ *
+ * <p>Logical row-size estimation uses 10 bytes (8 + 2). In {@code UnsafeRow},
values are stored in
+ * the variable-length region using a 16-byte payload (see
+ * {@link org.apache.spark.sql.catalyst.expressions.TimestampNanosRowValues}),
the same pattern as
+ * {@link CalendarInterval}.
+ *
+ * @since 4.3.0
+ */
+@Unstable
+public final class TimestampNanosVal implements Serializable {
+ /** Size of the {@code UnsafeRow} variable-length payload for this type (two
8-byte words). */
+ public static final int SIZE_IN_BYTES = 16;
+
+ /** Maximum valid value for {@link #nanosWithinMicro} (three sub-micro
decimal digits). */
+ public static final int MAX_NANOS_WITHIN_MICRO = 999;
+
+ /** Shared zero value, suitable for default-value lookups during analysis. */
+ public static final TimestampNanosVal ZERO = new TimestampNanosVal(0L,
(short) 0);
+
+ /** Microseconds since the Unix epoch. */
+ public final long epochMicros;
+ /** Nanoseconds within {@link #epochMicros}, in [0, 999]. */
+ public final short nanosWithinMicro;
+
+ /**
+ * Equivalent to {@link #fromParts}; prefer the factory for new callers.
Kept for symmetry with
+ * other value types (e.g. {@link CalendarInterval}) and for callers that
need a constructor.
+ *
+ * @param epochMicros microseconds since the Unix epoch
+ * @param nanosWithinMicro nanoseconds within {@code epochMicros}, must be
in [0, 999]
+ */
+ public TimestampNanosVal(long epochMicros, short nanosWithinMicro) {
+ this(epochMicros, nanosWithinMicro, /* trusted */ false);
+ }
+
+ private TimestampNanosVal(long epochMicros, short nanosWithinMicro, boolean
trusted) {
+ if (!trusted && (nanosWithinMicro < 0 || nanosWithinMicro >
MAX_NANOS_WITHIN_MICRO)) {
+ throw new SparkIllegalArgumentException(
+ "INTERNAL_ERROR",
+ Map.of(
+ "message",
+ "nanosWithinMicro must be in [0, " + MAX_NANOS_WITHIN_MICRO + "],
got: "
+ + nanosWithinMicro));
+ }
+ this.epochMicros = epochMicros;
+ this.nanosWithinMicro = nanosWithinMicro;
+ }
+
+ /**
+ * Creates a non-null value from its components.
+ */
+ public static TimestampNanosVal fromParts(long epochMicros, short
nanosWithinMicro) {
+ return new TimestampNanosVal(epochMicros, nanosWithinMicro);
+ }
+
+ /**
+ * Trusted factory for the row-read path. Skips the {@link
#nanosWithinMicro} range check
+ * because every value reaching this entry point was already validated when
it entered the
+ * row (the only paths to one go through the validating constructor / {@link
#fromParts}).
+ * Intended for internal callers like
+ * {@link
org.apache.spark.sql.catalyst.expressions.TimestampNanosRowValues}; do not use
from
+ * SQL- or user-facing code.
+ */
+ public static TimestampNanosVal fromTrustedRowBytes(long epochMicros, short
nanosWithinMicro) {
+ return new TimestampNanosVal(epochMicros, nanosWithinMicro, /* trusted */
true);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (!(o instanceof TimestampNanosVal that)) return false;
+ return epochMicros == that.epochMicros && nanosWithinMicro ==
that.nanosWithinMicro;
+ }
+
+ @Override
+ public int hashCode() {
+ // Manual mix, not Objects.hash: avoids the varargs array + autoboxing on
every call. This
+ // shows up on hash-bound paths (HashAggregate, HashJoin, distinct, set
membership).
+ return 31 * Long.hashCode(epochMicros) + nanosWithinMicro;
+ }
+
+ @Override
+ public String toString() {
+ return "TimestampNanosVal(" + epochMicros + ", " + nanosWithinMicro + ")";
+ }
+}
diff --git
a/common/unsafe/src/test/java/org/apache/spark/unsafe/types/TimestampNanosSuite.java
b/common/unsafe/src/test/java/org/apache/spark/unsafe/types/TimestampNanosSuite.java
new file mode 100644
index 000000000000..8a37767add9e
--- /dev/null
+++
b/common/unsafe/src/test/java/org/apache/spark/unsafe/types/TimestampNanosSuite.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.unsafe.types;
+
+import org.apache.spark.SparkIllegalArgumentException;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+public class TimestampNanosSuite {
+
+ @Test
+ public void timestampNanosValEqualsAndHashCode() {
+ TimestampNanosVal t1 = TimestampNanosVal.fromParts(1000L, (short) 100);
+ TimestampNanosVal t2 = TimestampNanosVal.fromParts(1001L, (short) 100);
+ TimestampNanosVal t3 = TimestampNanosVal.fromParts(1000L, (short) 101);
+ TimestampNanosVal t4 = TimestampNanosVal.fromParts(1000L, (short) 100);
+
+ assertNotEquals(t1, t2);
+ assertNotEquals(t1, t3);
+ assertEquals(t1, t4);
+ assertEquals(t1.hashCode(), t4.hashCode());
+ }
+
+ @Test
+ public void invalidNanosWithinMicro() {
+ assertThrows(SparkIllegalArgumentException.class,
+ () -> TimestampNanosVal.fromParts(0L, (short) -1));
+ assertThrows(SparkIllegalArgumentException.class,
+ () -> TimestampNanosVal.fromParts(0L, (short) 1000));
+ }
+
+ @Test
+ public void boundaryNanosWithinMicro() {
+ assertEquals((short) 0, TimestampNanosVal.fromParts(0L, (short)
0).nanosWithinMicro);
+ assertEquals((short) TimestampNanosVal.MAX_NANOS_WITHIN_MICRO,
+ TimestampNanosVal.fromParts(0L, (short)
TimestampNanosVal.MAX_NANOS_WITHIN_MICRO)
+ .nanosWithinMicro);
+ }
+
+ @Test
+ public void fromTrustedRowBytesSkipsValidation() {
+ // Documents the trust contract: callers on the row-read path may pass any
short without
+ // the [0, 999] check. The fields are stored as-given.
+ TimestampNanosVal v = TimestampNanosVal.fromTrustedRowBytes(0L, (short)
1234);
+ assertEquals((short) 1234, v.nanosWithinMicro);
+ }
+
+ @Test
+ public void equalsHandlesNullAndOtherTypes() {
+ TimestampNanosVal v = TimestampNanosVal.fromParts(0L, (short) 0);
+ assertNotEquals(v, null);
+ assertNotEquals(v, "not a timestamp");
+ }
+
+ @Test
+ public void constants() {
+ assertEquals(16, TimestampNanosVal.SIZE_IN_BYTES);
+ assertEquals(999, TimestampNanosVal.MAX_NANOS_WITHIN_MICRO);
+ assertEquals(0L, TimestampNanosVal.ZERO.epochMicros);
+ assertEquals((short) 0, TimestampNanosVal.ZERO.nanosWithinMicro);
+ }
+}
diff --git
a/sql/api/src/main/scala/org/apache/spark/sql/types/TimestampLTZNanosType.scala
b/sql/api/src/main/scala/org/apache/spark/sql/types/TimestampLTZNanosType.scala
index 7d65a492f544..66bb238ba01f 100644
---
a/sql/api/src/main/scala/org/apache/spark/sql/types/TimestampLTZNanosType.scala
+++
b/sql/api/src/main/scala/org/apache/spark/sql/types/TimestampLTZNanosType.scala
@@ -44,6 +44,10 @@ case class TimestampLTZNanosType(precision: Int) extends
DatetimeType {
/**
* Default size used by Spark for row-size estimation. Values are
represented logically as epoch
* microseconds (Long, 8 bytes) plus nanoseconds within that micro (Short, 2
bytes).
+ *
+ * In [[org.apache.spark.sql.catalyst.expressions.UnsafeRow]], the physical
payload is 16 bytes
+ * in the variable-length region (two 8-byte words); see
+ * [[org.apache.spark.sql.catalyst.expressions.TimestampNanosRowValues]].
*/
override def defaultSize: Int = 10
diff --git
a/sql/api/src/main/scala/org/apache/spark/sql/types/TimestampNTZNanosType.scala
b/sql/api/src/main/scala/org/apache/spark/sql/types/TimestampNTZNanosType.scala
index 722e0f2d25ed..7ef496e6aaee 100644
---
a/sql/api/src/main/scala/org/apache/spark/sql/types/TimestampNTZNanosType.scala
+++
b/sql/api/src/main/scala/org/apache/spark/sql/types/TimestampNTZNanosType.scala
@@ -44,6 +44,10 @@ case class TimestampNTZNanosType(precision: Int) extends
DatetimeType {
/**
* Default size used by Spark for row-size estimation. Values are
represented logically as epoch
* microseconds (Long, 8 bytes) plus nanoseconds within that micro (Short, 2
bytes).
+ *
+ * In [[org.apache.spark.sql.catalyst.expressions.UnsafeRow]], the physical
payload is 16 bytes
+ * in the variable-length region (two 8-byte words); see
+ * [[org.apache.spark.sql.catalyst.expressions.TimestampNanosRowValues]].
*/
override def defaultSize: Int = 10
diff --git
a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/SpecializedGetters.java
b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/SpecializedGetters.java
index 2a3a6884c3c6..f14edc585eaf 100644
---
a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/SpecializedGetters.java
+++
b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/SpecializedGetters.java
@@ -23,6 +23,7 @@ import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.Decimal;
import org.apache.spark.sql.catalyst.util.MapData;
import org.apache.spark.unsafe.types.CalendarInterval;
+import org.apache.spark.unsafe.types.TimestampNanosVal;
import org.apache.spark.unsafe.types.UTF8String;
import org.apache.spark.unsafe.types.VariantVal;
import org.apache.spark.unsafe.types.GeographyVal;
@@ -58,6 +59,12 @@ public interface SpecializedGetters {
CalendarInterval getInterval(int ordinal);
+ /** Nanosecond NTZ timestamp; see {@link TimestampNanosRowValues} for
UnsafeRow layout. */
+ TimestampNanosVal getTimestampNTZNanos(int ordinal);
+
+ /** Nanosecond LTZ timestamp; see {@link TimestampNanosRowValues} for
UnsafeRow layout. */
+ TimestampNanosVal getTimestampLTZNanos(int ordinal);
+
VariantVal getVariant(int ordinal);
InternalRow getStruct(int ordinal, int numFields);
diff --git
a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/SpecializedGettersReader.java
b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/SpecializedGettersReader.java
index 830aa0d0d0fb..d1d4608a1d4c 100644
---
a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/SpecializedGettersReader.java
+++
b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/SpecializedGettersReader.java
@@ -72,6 +72,12 @@ public final class SpecializedGettersReader {
if (physicalDataType instanceof PhysicalCalendarIntervalType) {
return obj.getInterval(ordinal);
}
+ if (physicalDataType instanceof PhysicalTimestampNTZNanosType) {
+ return obj.getTimestampNTZNanos(ordinal);
+ }
+ if (physicalDataType instanceof PhysicalTimestampLTZNanosType) {
+ return obj.getTimestampLTZNanos(ordinal);
+ }
if (physicalDataType instanceof PhysicalBinaryType) {
return obj.getBinary(ordinal);
}
diff --git
a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/TimestampNanosRowValues.java
b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/TimestampNanosRowValues.java
new file mode 100644
index 000000000000..be036b3d44b4
--- /dev/null
+++
b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/TimestampNanosRowValues.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions;
+
+import org.apache.spark.unsafe.Platform;
+import org.apache.spark.unsafe.types.TimestampNanosVal;
+
+/**
+ * Shared read/write helpers for nanosecond timestamp values in {@link
UnsafeRow} and
+ * {@link UnsafeArrayData}.
+ *
+ * <p>Each value occupies a 16-byte payload in the row's variable-length
region (the field's 8-byte
+ * word stores {@code (offset << 32) | size}, like
+ * {@link org.apache.spark.unsafe.types.CalendarInterval}).
+ * The logical composite is 10 bytes (8-byte epoch micros + 2-byte sub-micro
nanos), but UnsafeRow
+ * stores it as two aligned 8-byte words:
+ * <pre>
+ * word 0: epochMicros (long)
+ * word 1: nanosWithinMicro in the low 16 bits; upper 48 bits are zero
+ * </pre>
+ *
+ * <p>The second word is written and read as a full {@code long} (not {@code
short}) so the upper
+ * 48 padding bits are deterministically zero. {@link UnsafeRow} compares and
hashes rows
+ * byte-wise, so any non-deterministic padding would make logically-equal
values look different.
+ * Byte order is not a concern because writes and reads both go through {@link
Platform} and
+ * therefore share the same native endianness.
+ *
+ * <p>The {@code read*} methods do not check the null bit; callers must verify
+ * {@link SpecializedGetters#isNullAt(int)} first. Reads from a null cell
return unspecified
+ * bytes (zero for slots that went through {@link #zeroPayload}, otherwise
whatever the buffer
+ * was initialized with).
+ */
+public final class TimestampNanosRowValues {
+ /** Payload size in the UnsafeRow variable-length region (two 8-byte words).
*/
+ public static final int SIZE_IN_BYTES = TimestampNanosVal.SIZE_IN_BYTES;
+
+ private TimestampNanosRowValues() {
+ }
+
+ /**
+ * Writes a non-null nanosecond timestamp payload at {@code baseOffset +
cursor}.
+ */
+ public static void writePayload(
+ Object baseObject, long baseOffset, int cursor, long epochMicros, short
nanosWithinMicro) {
+ assert nanosWithinMicro >= 0 && nanosWithinMicro <=
TimestampNanosVal.MAX_NANOS_WITHIN_MICRO :
+ "nanosWithinMicro out of range: " + nanosWithinMicro;
+ Platform.putLong(baseObject, baseOffset + cursor, epochMicros);
+ // Validated to [0, 999], so widening to long zero-extends; upper 48 bits
become zero.
+ Platform.putLong(baseObject, baseOffset + cursor + 8, nanosWithinMicro);
+ }
+
+ /**
+ * Zeroes a payload slot. Used when updating a nullable column to null while
retaining the
+ * variable-length offset for in-place updates (same pattern as {@link
UnsafeRow#setInterval}).
+ */
+ public static void zeroPayload(Object baseObject, long baseOffset, int
cursor) {
+ Platform.putLong(baseObject, baseOffset + cursor, 0L);
+ Platform.putLong(baseObject, baseOffset + cursor + 8, 0L);
+ }
+
+ public static long readEpochMicros(Object baseObject, long baseOffset, int
offset) {
+ return Platform.getLong(baseObject, baseOffset + offset);
+ }
+
+ public static short readNanosWithinMicro(Object baseObject, long baseOffset,
int offset) {
+ // Match writePayload's putLong; the (short) cast truncates to the low 16
bits.
+ return (short) Platform.getLong(baseObject, baseOffset + offset + 8);
+ }
+
+ public static TimestampNanosVal readVal(Object baseObject, long baseOffset,
int offset) {
+ // Use the trusted factory: every value that ever reached the row was
validated at its
+ // origin (the public constructor / fromParts), so re-checking the nanos
range on every
+ // cell read would be wasted work on a hot path.
+ return TimestampNanosVal.fromTrustedRowBytes(
+ readEpochMicros(baseObject, baseOffset, offset),
+ readNanosWithinMicro(baseObject, baseOffset, offset));
+ }
+}
diff --git
a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeArrayData.java
b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeArrayData.java
index 09ac634955fc..b653ae60ce9c 100644
---
a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeArrayData.java
+++
b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeArrayData.java
@@ -38,6 +38,7 @@ import org.apache.spark.unsafe.array.ByteArrayMethods;
import org.apache.spark.unsafe.bitset.BitSetMethods;
import org.apache.spark.unsafe.hash.Murmur3_x86_32;
import org.apache.spark.unsafe.types.CalendarInterval;
+import org.apache.spark.unsafe.types.TimestampNanosVal;
import org.apache.spark.unsafe.types.UTF8String;
import org.apache.spark.unsafe.types.VariantVal;
import org.apache.spark.unsafe.types.GeographyVal;
@@ -248,6 +249,22 @@ public final class UnsafeArrayData extends ArrayData
implements Externalizable,
return new CalendarInterval(months, days, microseconds);
}
+ // NTZ and LTZ share the byte layout; the duplicated getters track the
SpecializedGetters
+ // interface, where the distinction is preserved at the logical-type level.
+ @Override
+ public TimestampNanosVal getTimestampNTZNanos(int ordinal) {
+ if (isNullAt(ordinal)) return null;
+ final int offset = (int) (getLong(ordinal) >> 32);
+ return TimestampNanosRowValues.readVal(baseObject, baseOffset, offset);
+ }
+
+ @Override
+ public TimestampNanosVal getTimestampLTZNanos(int ordinal) {
+ if (isNullAt(ordinal)) return null;
+ final int offset = (int) (getLong(ordinal) >> 32);
+ return TimestampNanosRowValues.readVal(baseObject, baseOffset, offset);
+ }
+
@Override
public VariantVal getVariant(int ordinal) {
if (isNullAt(ordinal)) return null;
diff --git
a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
index ff9eeea9bf12..60af3ac2b2aa 100644
---
a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
+++
b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
@@ -38,6 +38,7 @@ import org.apache.spark.unsafe.array.ByteArrayMethods;
import org.apache.spark.unsafe.bitset.BitSetMethods;
import org.apache.spark.unsafe.hash.Murmur3_x86_32;
import org.apache.spark.unsafe.types.CalendarInterval;
+import org.apache.spark.unsafe.types.TimestampNanosVal;
import org.apache.spark.unsafe.types.UTF8String;
import org.apache.spark.unsafe.types.VariantVal;
import org.apache.spark.unsafe.types.GeographyVal;
@@ -96,7 +97,9 @@ public final class UnsafeRow extends InternalRow implements
Externalizable, Kryo
}
PhysicalDataType pdt = PhysicalDataType.apply(dt);
return pdt instanceof PhysicalPrimitiveType || pdt instanceof
PhysicalDecimalType ||
- pdt instanceof PhysicalCalendarIntervalType;
+ pdt instanceof PhysicalCalendarIntervalType ||
+ pdt instanceof PhysicalTimestampNTZNanosType ||
+ pdt instanceof PhysicalTimestampLTZNanosType;
}
//////////////////////////////////////////////////////////////////////////////
@@ -322,6 +325,48 @@ public final class UnsafeRow extends InternalRow
implements Externalizable, Kryo
}
}
+ /**
+ * Updates a nanosecond NTZ timestamp column in place. Uses the same
variable-length layout as
+ * {@link #setInterval(int, CalendarInterval)}. For null values, do not call
{@link #setNullAt};
+ * pass {@code null} here so the variable-length offset is retained for
future updates.
+ *
+ * <p>NTZ and LTZ share the byte layout; the duplicated setter/getter pairs
track the
+ * {@link SpecializedGetters} interface, where the distinction is preserved
at the logical-type
+ * level.
+ */
+ @Override
+ public void setTimestampNTZNanos(int ordinal, TimestampNanosVal value) {
+ setTimestampNanosPayload(ordinal, value);
+ }
+
+ /**
+ * Updates a nanosecond LTZ timestamp column in place. See {@link
#setTimestampNTZNanos(int,
+ * TimestampNanosVal)}.
+ */
+ @Override
+ public void setTimestampLTZNanos(int ordinal, TimestampNanosVal value) {
+ setTimestampNanosPayload(ordinal, value);
+ }
+
+ // 16-byte payload in the variable-length region; see
TimestampNanosRowValues.
+ private void setTimestampNanosPayload(int ordinal, TimestampNanosVal value) {
+ assertIndexIsValid(ordinal);
+ long cursor = getLong(ordinal) >>> 32;
+ assert cursor > 0 : "invalid cursor " + cursor;
+ long offsetAndSize = (cursor << 32) |
TimestampNanosRowValues.SIZE_IN_BYTES;
+ if (value == null) {
+ // Set the null bit directly instead of via setNullAt; setNullAt would
zero the field slot
+ // that we immediately overwrite with offsetAndSize below.
+ BitSetMethods.set(baseObject, baseOffset, ordinal);
+ TimestampNanosRowValues.zeroPayload(baseObject, baseOffset, (int)
cursor);
+ Platform.putLong(baseObject, getFieldOffset(ordinal), offsetAndSize);
+ } else {
+ TimestampNanosRowValues.writePayload(
+ baseObject, baseOffset, (int) cursor, value.epochMicros,
value.nanosWithinMicro);
+ setLong(ordinal, offsetAndSize);
+ }
+ }
+
@Override
public Object get(int ordinal, DataType dataType) {
return SpecializedGettersReader.read(this, ordinal, dataType, true, true);
@@ -446,6 +491,26 @@ public final class UnsafeRow extends InternalRow
implements Externalizable, Kryo
}
}
+ @Override
+ public TimestampNanosVal getTimestampNTZNanos(int ordinal) {
+ if (isNullAt(ordinal)) {
+ return null;
+ } else {
+ final int offset = (int) (getLong(ordinal) >> 32);
+ return TimestampNanosRowValues.readVal(baseObject, baseOffset, offset);
+ }
+ }
+
+ @Override
+ public TimestampNanosVal getTimestampLTZNanos(int ordinal) {
+ if (isNullAt(ordinal)) {
+ return null;
+ } else {
+ final int offset = (int) (getLong(ordinal) >> 32);
+ return TimestampNanosRowValues.readVal(baseObject, baseOffset, offset);
+ }
+ }
+
@Override
public VariantVal getVariant(int ordinal) {
if (isNullAt(ordinal)) return null;
diff --git
a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeArrayWriter.java
b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeArrayWriter.java
index 3070fa3e74b1..47a88fb4d190 100644
---
a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeArrayWriter.java
+++
b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeArrayWriter.java
@@ -23,6 +23,7 @@ import org.apache.spark.unsafe.Platform;
import org.apache.spark.unsafe.array.ByteArrayMethods;
import org.apache.spark.unsafe.bitset.BitSetMethods;
import org.apache.spark.unsafe.types.CalendarInterval;
+import org.apache.spark.unsafe.types.TimestampNanosVal;
import static
org.apache.spark.sql.catalyst.expressions.UnsafeArrayData.calculateHeaderPortionInBytes;
@@ -206,4 +207,14 @@ public final class UnsafeArrayWriter extends UnsafeWriter {
super.write(ordinal, input);
}
}
+
+ @Override
+ public void write(int ordinal, TimestampNanosVal input) {
+ assertIndexIsValid(ordinal);
+ if (input == null) {
+ setNull(ordinal);
+ } else {
+ super.write(ordinal, input);
+ }
+ }
}
diff --git
a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeWriter.java
b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeWriter.java
index e2abc108bb1b..5341927dd296 100644
---
a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeWriter.java
+++
b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeWriter.java
@@ -24,6 +24,8 @@ import org.apache.spark.unsafe.Platform;
import org.apache.spark.unsafe.array.ByteArrayMethods;
import org.apache.spark.unsafe.bitset.BitSetMethods;
import org.apache.spark.unsafe.types.CalendarInterval;
+import org.apache.spark.unsafe.types.TimestampNanosVal;
+import org.apache.spark.sql.catalyst.expressions.TimestampNanosRowValues;
import org.apache.spark.unsafe.types.GeographyVal;
import org.apache.spark.unsafe.types.GeometryVal;
import org.apache.spark.unsafe.types.UTF8String;
@@ -161,6 +163,19 @@ public abstract class UnsafeWriter {
increaseCursor(16);
}
+ // 16-byte variable-length payload; same layout as
UnsafeRow#setTimestampNanosPayload.
+ public void write(int ordinal, TimestampNanosVal input) {
+ grow(TimestampNanosRowValues.SIZE_IN_BYTES);
+ if (input == null) {
+ BitSetMethods.set(getBuffer(), startingOffset, ordinal);
+ } else {
+ TimestampNanosRowValues.writePayload(
+ getBuffer(), 0, (int) cursor(), input.epochMicros,
input.nanosWithinMicro);
+ }
+ setOffsetAndSize(ordinal, TimestampNanosRowValues.SIZE_IN_BYTES);
+ increaseCursor(TimestampNanosRowValues.SIZE_IN_BYTES);
+ }
+
public void write(int ordinal, VariantVal input) {
// See the class comment of VariantVal for the format of the binary
content.
byte[] value = input.getValue();
diff --git
a/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnVector.java
b/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnVector.java
index 8e9a5a620b3e..952d084ac190 100644
---
a/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnVector.java
+++
b/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnVector.java
@@ -18,11 +18,13 @@ package org.apache.spark.sql.vectorized;
import scala.PartialFunction;
+import org.apache.spark.SparkUnsupportedOperationException;
import org.apache.spark.annotation.Evolving;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.Decimal;
import org.apache.spark.sql.types.UserDefinedType;
import org.apache.spark.unsafe.types.CalendarInterval;
+import org.apache.spark.unsafe.types.TimestampNanosVal;
import org.apache.spark.unsafe.types.UTF8String;
import org.apache.spark.unsafe.types.VariantVal;
import org.apache.spark.unsafe.types.GeographyVal;
@@ -326,6 +328,14 @@ public abstract class ColumnVector implements
AutoCloseable {
return new CalendarInterval(months, days, microseconds);
}
+ public TimestampNanosVal getTimestampNTZNanos(int rowId) {
+ throw SparkUnsupportedOperationException.apply();
+ }
+
+ public TimestampNanosVal getTimestampLTZNanos(int rowId) {
+ throw SparkUnsupportedOperationException.apply();
+ }
+
/**
* Returns the Variant value for {@code rowId}. Similar to {@link
#getInterval(int)}, the
* implementation must implement {@link #getChild(int)} and define 2 child
vectors of binary type
diff --git
a/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnarArray.java
b/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnarArray.java
index 861a6a4c50e4..dda358ceb420 100644
---
a/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnarArray.java
+++
b/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnarArray.java
@@ -24,6 +24,7 @@ import org.apache.spark.sql.catalyst.util.ArrayData;
import org.apache.spark.sql.catalyst.util.GenericArrayData;
import org.apache.spark.sql.types.*;
import org.apache.spark.unsafe.types.CalendarInterval;
+import org.apache.spark.unsafe.types.TimestampNanosVal;
import org.apache.spark.unsafe.types.UTF8String;
import org.apache.spark.unsafe.types.VariantVal;
import org.apache.spark.unsafe.types.GeographyVal;
@@ -191,6 +192,16 @@ public final class ColumnarArray extends ArrayData {
return data.getInterval(offset + ordinal);
}
+ @Override
+ public TimestampNanosVal getTimestampNTZNanos(int ordinal) {
+ return data.getTimestampNTZNanos(offset + ordinal);
+ }
+
+ @Override
+ public TimestampNanosVal getTimestampLTZNanos(int ordinal) {
+ return data.getTimestampLTZNanos(offset + ordinal);
+ }
+
@Override
public VariantVal getVariant(int ordinal) {
return data.getVariant(offset + ordinal);
diff --git
a/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnarBatchRow.java
b/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnarBatchRow.java
index 42b335dfd2bc..ae1439883a1a 100644
---
a/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnarBatchRow.java
+++
b/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnarBatchRow.java
@@ -25,6 +25,7 @@ import
org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
import org.apache.spark.sql.catalyst.types.*;
import org.apache.spark.sql.types.*;
import org.apache.spark.unsafe.types.CalendarInterval;
+import org.apache.spark.unsafe.types.TimestampNanosVal;
import org.apache.spark.unsafe.types.UTF8String;
import org.apache.spark.unsafe.types.VariantVal;
import org.apache.spark.unsafe.types.GeographyVal;
@@ -155,6 +156,16 @@ public final class ColumnarBatchRow extends InternalRow {
return columns[ordinal].getInterval(rowId);
}
+ @Override
+ public TimestampNanosVal getTimestampNTZNanos(int ordinal) {
+ return columns[ordinal].getTimestampNTZNanos(rowId);
+ }
+
+ @Override
+ public TimestampNanosVal getTimestampLTZNanos(int ordinal) {
+ return columns[ordinal].getTimestampLTZNanos(rowId);
+ }
+
@Override
public VariantVal getVariant(int ordinal) {
return columns[ordinal].getVariant(rowId);
diff --git
a/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnarRow.java
b/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnarRow.java
index d66baa8fd8fe..79b78c46cf6a 100644
---
a/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnarRow.java
+++
b/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnarRow.java
@@ -23,6 +23,7 @@ import
org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
import org.apache.spark.sql.catalyst.types.*;
import org.apache.spark.sql.types.*;
import org.apache.spark.unsafe.types.CalendarInterval;
+import org.apache.spark.unsafe.types.TimestampNanosVal;
import org.apache.spark.unsafe.types.UTF8String;
import org.apache.spark.unsafe.types.VariantVal;
import org.apache.spark.unsafe.types.GeographyVal;
@@ -160,6 +161,16 @@ public final class ColumnarRow extends InternalRow {
return data.getChild(ordinal).getInterval(rowId);
}
+ @Override
+ public TimestampNanosVal getTimestampNTZNanos(int ordinal) {
+ return data.getChild(ordinal).getTimestampNTZNanos(rowId);
+ }
+
+ @Override
+ public TimestampNanosVal getTimestampLTZNanos(int ordinal) {
+ return data.getChild(ordinal).getTimestampLTZNanos(rowId);
+ }
+
@Override
public VariantVal getVariant(int ordinal) {
return data.getChild(ordinal).getVariant(rowId);
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/InternalRow.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/InternalRow.scala
index b27283cb3f64..1323f3737cff 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/InternalRow.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/InternalRow.scala
@@ -22,7 +22,7 @@ import org.apache.spark.sql.catalyst.types._
import org.apache.spark.sql.catalyst.types.ops.TypeOps
import org.apache.spark.sql.catalyst.util.{ArrayData, MapData}
import org.apache.spark.sql.types._
-import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String}
+import org.apache.spark.unsafe.types.{CalendarInterval, TimestampNanosVal,
UTF8String}
import org.apache.spark.util.ArrayImplicits._
/**
@@ -63,6 +63,18 @@ abstract class InternalRow extends SpecializedGetters with
Serializable {
def setInterval(i: Int, value: CalendarInterval): Unit = update(i, value)
+ /**
+ * Sets a nanosecond NTZ timestamp. On
[[org.apache.spark.sql.catalyst.expressions.UnsafeRow]],
+ * use this instead of [[setNullAt]] for null so the variable-length offset
is preserved.
+ */
+ def setTimestampNTZNanos(i: Int, value: TimestampNanosVal): Unit = update(i,
value)
+
+ /**
+ * Sets a nanosecond LTZ timestamp. On
[[org.apache.spark.sql.catalyst.expressions.UnsafeRow]],
+ * use this instead of [[setNullAt]] for null so the variable-length offset
is preserved.
+ */
+ def setTimestampLTZNanos(i: Int, value: TimestampNanosVal): Unit = update(i,
value)
+
/**
* Make a copy of the current [[InternalRow]] object.
*/
@@ -144,6 +156,10 @@ object InternalRow {
case _: PhysicalStringType => (input, ordinal) =>
input.getUTF8String(ordinal)
case PhysicalBinaryType => (input, ordinal) => input.getBinary(ordinal)
case PhysicalCalendarIntervalType => (input, ordinal) =>
input.getInterval(ordinal)
+ case PhysicalTimestampNTZNanosType => (input, ordinal) =>
+ input.getTimestampNTZNanos(ordinal)
+ case PhysicalTimestampLTZNanosType => (input, ordinal) =>
+ input.getTimestampLTZNanos(ordinal)
case t: PhysicalDecimalType => (input, ordinal) =>
input.getDecimal(ordinal, t.precision, t.scale)
case t: PhysicalStructType => (input, ordinal) =>
input.getStruct(ordinal, t.fields.length)
@@ -185,6 +201,10 @@ object InternalRow {
case DoubleType => (input, v) => input.setDouble(ordinal,
v.asInstanceOf[Double])
case CalendarIntervalType =>
(input, v) => input.setInterval(ordinal,
v.asInstanceOf[CalendarInterval])
+ case _: TimestampNTZNanosType =>
+ (input, v) => input.setTimestampNTZNanos(ordinal,
v.asInstanceOf[TimestampNanosVal])
+ case _: TimestampLTZNanosType =>
+ (input, v) => input.setTimestampLTZNanos(ordinal,
v.asInstanceOf[TimestampNanosVal])
case DecimalType.Fixed(precision, _) =>
(input, v) => input.setDecimal(ordinal, v.asInstanceOf[Decimal],
precision)
case udt: UserDefinedType[_] => getWriter(ordinal, udt.sqlType)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ProjectingInternalRow.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ProjectingInternalRow.scala
index 0e451db6cfe2..5923327e4a5c 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ProjectingInternalRow.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ProjectingInternalRow.scala
@@ -105,6 +105,14 @@ case class ProjectingInternalRow(schema: StructType,
row.getInterval(colOrdinals(ordinal))
}
+ override def getTimestampNTZNanos(ordinal: Int): TimestampNanosVal = {
+ row.getTimestampNTZNanos(colOrdinals(ordinal))
+ }
+
+ override def getTimestampLTZNanos(ordinal: Int): TimestampNanosVal = {
+ row.getTimestampLTZNanos(colOrdinals(ordinal))
+ }
+
override def getVariant(ordinal: Int): VariantVal = {
row.getVariant(colOrdinals(ordinal))
}
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedUnsafeProjection.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedUnsafeProjection.scala
index 53b3e0598d58..ad9ea95c2c96 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedUnsafeProjection.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedUnsafeProjection.scala
@@ -159,6 +159,12 @@ object InterpretedUnsafeProjection {
case PhysicalCalendarIntervalType => (v, i) => writer.write(i,
v.getInterval(i))
+ case PhysicalTimestampNTZNanosType => (v, i) =>
+ writer.write(i, v.getTimestampNTZNanos(i))
+
+ case PhysicalTimestampLTZNanosType => (v, i) =>
+ writer.write(i, v.getTimestampLTZNanos(i))
+
case PhysicalBinaryType => (v, i) => writer.write(i, v.getBinary(i))
case _: PhysicalStringType => (v, i) => writer.write(i,
v.getUTF8String(i))
@@ -256,6 +262,9 @@ object InterpretedUnsafeProjection {
case CalendarIntervalType =>
// We can't call setNullAt() for CalendarIntervalType, we call write
directly.
unsafeWriter
+ case _: TimestampNTZNanosType | _: TimestampLTZNanosType =>
+ // We can't call setNullAt() for nanos timestamp types, we call write
directly.
+ unsafeWriter
case BooleanType | ByteType =>
(v, i) => {
if (!v.isNullAt(i)) {
@@ -301,6 +310,7 @@ object InterpretedUnsafeProjection {
@scala.annotation.tailrec
private def getElementSize(dataType: DataType): Int = dataType match {
case NullType | _: StringType | BinaryType | CalendarIntervalType |
VariantType |
+ _: TimestampNTZNanosType | _: TimestampLTZNanosType |
_: DecimalType | _: StructType | _: ArrayType | _: MapType => 8
case udt: UserDefinedType[_] =>
getElementSize(udt.sqlType)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/JoinedRow.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/JoinedRow.scala
index 4211dd5e4df0..66b6640e5432 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/JoinedRow.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/JoinedRow.scala
@@ -126,6 +126,14 @@ class JoinedRow extends InternalRow {
override def getInterval(i: Int): CalendarInterval =
if (i < row1.numFields) row1.getInterval(i) else row2.getInterval(i -
row1.numFields)
+ override def getTimestampNTZNanos(i: Int): TimestampNanosVal =
+ if (i < row1.numFields) row1.getTimestampNTZNanos(i)
+ else row2.getTimestampNTZNanos(i - row1.numFields)
+
+ override def getTimestampLTZNanos(i: Int): TimestampNanosVal =
+ if (i < row1.numFields) row1.getTimestampLTZNanos(i)
+ else row2.getTimestampLTZNanos(i - row1.numFields)
+
override def getVariant(i: Int): VariantVal =
if (i < row1.numFields) row1.getVariant(i) else row2.getVariant(i -
row1.numFields)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
index 139a7f03cfa4..330116e59248 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
@@ -1531,6 +1531,7 @@ object CodeGenerator extends Logging {
classOf[UTF8String].getName,
classOf[Decimal].getName,
classOf[CalendarInterval].getName,
+ classOf[org.apache.spark.unsafe.types.TimestampNanosVal].getName,
classOf[VariantVal].getName,
classOf[ArrayData].getName,
classOf[UnsafeArrayData].getName,
@@ -1695,6 +1696,8 @@ object CodeGenerator extends Logging {
case _: PhysicalGeographyType => s"$input.getGeography($ordinal)"
case _: PhysicalGeometryType => s"$input.getGeometry($ordinal)"
case PhysicalCalendarIntervalType => s"$input.getInterval($ordinal)"
+ case PhysicalTimestampNTZNanosType =>
s"$input.getTimestampNTZNanos($ordinal)"
+ case PhysicalTimestampLTZNanosType =>
s"$input.getTimestampLTZNanos($ordinal)"
case t: PhysicalDecimalType => s"$input.getDecimal($ordinal,
${t.precision}, ${t.scale})"
case _: PhysicalMapType => s"$input.getMap($ordinal)"
case PhysicalNullType => "null"
@@ -1768,6 +1771,8 @@ object CodeGenerator extends Logging {
dataType match {
case _ if isPrimitiveType(jt) =>
s"$row.set${primitiveTypeName(jt)}($ordinal, $value)"
case CalendarIntervalType => s"$row.setInterval($ordinal, $value)"
+ case _: TimestampNTZNanosType => s"$row.setTimestampNTZNanos($ordinal,
$value)"
+ case _: TimestampLTZNanosType => s"$row.setTimestampLTZNanos($ordinal,
$value)"
case t: DecimalType => s"$row.setDecimal($ordinal, $value,
${t.precision})"
case udt: UserDefinedType[_] => setColumn(row, udt.sqlType, ordinal,
value)
// The UTF8String, InternalRow, ArrayData and MapData may came from
UnsafeRow, we should copy
@@ -1983,6 +1988,8 @@ object CodeGenerator extends Logging {
case PhysicalBooleanType => JAVA_BOOLEAN
case PhysicalByteType => JAVA_BYTE
case PhysicalCalendarIntervalType => "CalendarInterval"
+ case PhysicalTimestampNTZNanosType => "TimestampNanosVal"
+ case PhysicalTimestampLTZNanosType => "TimestampNanosVal"
case PhysicalIntegerType => JAVA_INT
case _: PhysicalDecimalType => "Decimal"
case PhysicalDoubleType => JAVA_DOUBLE
@@ -2015,6 +2022,8 @@ object CodeGenerator extends Logging {
case _: GeometryType => classOf[GeometryVal]
case _: StringType => classOf[UTF8String]
case CalendarIntervalType => classOf[CalendarInterval]
+ case _: TimestampNTZNanosType | _: TimestampLTZNanosType =>
+ classOf[org.apache.spark.unsafe.types.TimestampNanosVal]
case _: StructType => classOf[InternalRow]
case _: ArrayType => classOf[ArrayData]
case _: MapType => classOf[MapData]
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala
index 459c1d9a8ba1..0f64eefe1a06 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala
@@ -38,6 +38,7 @@ object GenerateUnsafeProjection extends
CodeGenerator[Seq[Expression], UnsafePro
/** Returns true iff we support this data type. */
def canSupport(dataType: DataType): Boolean =
UserDefinedType.sqlType(dataType) match {
case NullType => true
+ case _: TimestampNTZNanosType | _: TimestampLTZNanosType => true
case _: AtomicType => true
case _: CalendarIntervalType => true
case t: StructType => t.forall(field => canSupport(field.dataType))
@@ -112,6 +113,8 @@ object GenerateUnsafeProjection extends
CodeGenerator[Seq[Expression], UnsafePro
// Can't call setNullAt() for DecimalType with precision larger
than 18.
s"$rowWriter.write($index, (Decimal) null, ${t.precision},
${t.scale});"
case CalendarIntervalType => s"$rowWriter.write($index,
(CalendarInterval) null);"
+ case _: TimestampNTZNanosType | _: TimestampLTZNanosType =>
+ s"$rowWriter.write($index, (TimestampNanosVal) null);"
case _ => s"$rowWriter.setNullAt($index);"
}
@@ -176,10 +179,19 @@ object GenerateUnsafeProjection extends
CodeGenerator[Seq[Expression], UnsafePro
val element = CodeGenerator.getValue(tmpInput, et, index)
+ val setNullElement = et match {
+ case t: DecimalType if t.precision > Decimal.MAX_LONG_DIGITS =>
+ s"$arrayWriter.write($index, (Decimal) null, ${t.precision},
${t.scale});"
+ case CalendarIntervalType => s"$arrayWriter.write($index,
(CalendarInterval) null);"
+ case _: TimestampNTZNanosType | _: TimestampLTZNanosType =>
+ s"$arrayWriter.write($index, (TimestampNanosVal) null);"
+ case _ => s"$arrayWriter.setNull${elementOrOffsetSize}Bytes($index);"
+ }
+
val elementAssignment = if (containsNull) {
s"""
|if ($tmpInput.isNullAt($index)) {
- | $arrayWriter.setNull${elementOrOffsetSize}Bytes($index);
+ | $setNullElement
|} else {
| ${writeElement(ctx, element, index, et, arrayWriter)}
|}
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala
index 3b222ca05235..0b1402b9103c 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala
@@ -149,6 +149,7 @@ object Literal {
case _ if clz == classOf[BigInt] => DecimalType.SYSTEM_DEFAULT
case _ if clz == classOf[BigDecimal] => DecimalType.SYSTEM_DEFAULT
case _ if clz == classOf[CalendarInterval] => CalendarIntervalType
+ case _ if clz == classOf[TimestampNanosVal] => TimestampNTZNanosType()
case _ if clz == classOf[VariantVal] => VariantType
case _ if clz.isArray =>
ArrayType(componentTypeToDataType(clz.getComponentType))
@@ -203,6 +204,8 @@ object Literal {
case DateType => create(0, DateType)
case TimestampType => create(0L, TimestampType)
case TimestampNTZType => create(0L, TimestampNTZType)
+ case t: TimestampNTZNanosType => create(TimestampNanosVal.ZERO, t)
+ case t: TimestampLTZNanosType => create(TimestampNanosVal.ZERO, t)
case t: TimeType => create(0L, t)
case it: DayTimeIntervalType => create(0L, it)
case it: YearMonthIntervalType => create(0, it)
@@ -242,6 +245,8 @@ object Literal {
case PhysicalBooleanType => v.isInstanceOf[Boolean]
case PhysicalByteType => v.isInstanceOf[Byte]
case PhysicalCalendarIntervalType => v.isInstanceOf[CalendarInterval]
+ case PhysicalTimestampNTZNanosType => v.isInstanceOf[TimestampNanosVal]
+ case PhysicalTimestampLTZNanosType => v.isInstanceOf[TimestampNanosVal]
case PhysicalIntegerType => v.isInstanceOf[Int]
case _: PhysicalDecimalType => v.isInstanceOf[Decimal]
case PhysicalDoubleType => v.isInstanceOf[Double]
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/rows.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/rows.scala
index b8d6054fc6fc..b897c0e6cc08 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/rows.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/rows.scala
@@ -49,6 +49,8 @@ trait BaseGenericInternalRow extends InternalRow {
override def getGeometry(ordinal: Int): GeometryVal = getAs(ordinal)
override def getArray(ordinal: Int): ArrayData = getAs(ordinal)
override def getInterval(ordinal: Int): CalendarInterval = getAs(ordinal)
+ override def getTimestampNTZNanos(ordinal: Int): TimestampNanosVal =
getAs(ordinal)
+ override def getTimestampLTZNanos(ordinal: Int): TimestampNanosVal =
getAs(ordinal)
override def getVariant(ordinal: Int): VariantVal = getAs(ordinal)
override def getMap(ordinal: Int): MapData = getAs(ordinal)
override def getStruct(ordinal: Int, numFields: Int): InternalRow =
getAs(ordinal)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/PhysicalDataType.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/PhysicalDataType.scala
index 6f49b3998652..d89feb7e0dfd 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/PhysicalDataType.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/PhysicalDataType.scala
@@ -24,8 +24,8 @@ import org.apache.spark.sql.catalyst.expressions.{Ascending,
BoundReference, Int
import org.apache.spark.sql.catalyst.types.ops.TypeOps
import org.apache.spark.sql.catalyst.util.{ArrayData, CollationFactory,
MapData, SQLOrderingUtil}
import org.apache.spark.sql.errors.QueryExecutionErrors
-import org.apache.spark.sql.types.{ArrayType, BinaryType, BooleanType,
ByteExactNumeric, ByteType, CalendarIntervalType, CharType, DataType, DateType,
DayTimeIntervalType, Decimal, DecimalExactNumeric, DecimalType,
DoubleExactNumeric, DoubleType, FloatExactNumeric, FloatType, FractionalType,
GeographyType, GeometryType, IntegerExactNumeric, IntegerType, IntegralType,
LongExactNumeric, LongType, MapType, NullType, NumericType, ShortExactNumeric,
ShortType, StringType, StructField, StructT [...]
-import org.apache.spark.unsafe.types.{ByteArray, GeographyVal, GeometryVal,
UTF8String, VariantVal}
+import org.apache.spark.sql.types.{ArrayType, BinaryType, BooleanType,
ByteExactNumeric, ByteType, CalendarIntervalType, CharType, DataType, DateType,
DayTimeIntervalType, Decimal, DecimalExactNumeric, DecimalType,
DoubleExactNumeric, DoubleType, FloatExactNumeric, FloatType, FractionalType,
GeographyType, GeometryType, IntegerExactNumeric, IntegerType, IntegralType,
LongExactNumeric, LongType, MapType, NullType, NumericType, ShortExactNumeric,
ShortType, StringType, StructField, StructT [...]
+import org.apache.spark.unsafe.types.{ByteArray, GeographyVal, GeometryVal,
TimestampNanosVal, UTF8String, VariantVal}
import org.apache.spark.util.ArrayImplicits._
sealed abstract class PhysicalDataType {
@@ -55,6 +55,8 @@ object PhysicalDataType {
case TimestampType => PhysicalLongType
case TimestampNTZType => PhysicalLongType
case CalendarIntervalType => PhysicalCalendarIntervalType
+ case _: TimestampNTZNanosType => PhysicalTimestampNTZNanosType
+ case _: TimestampLTZNanosType => PhysicalTimestampLTZNanosType
case DayTimeIntervalType(_, _) => PhysicalLongType
case YearMonthIntervalType(_, _) => PhysicalIntegerType
case DateType => PhysicalIntegerType
@@ -166,6 +168,46 @@ class PhysicalCalendarIntervalType() extends
PhysicalDataType {
}
case object PhysicalCalendarIntervalType extends PhysicalCalendarIntervalType
+/**
+ * Physical type for [[org.apache.spark.sql.types.TimestampNTZNanosType]].
Internal values are
+ * [[TimestampNanosVal]] (epoch micros + nanos within the micro). Stored in
[[UnsafeRow]] via a
+ * 16-byte variable-length payload; see
+ * [[org.apache.spark.sql.catalyst.expressions.TimestampNanosRowValues]].
+ *
+ * Storage layout is identical to [[PhysicalTimestampLTZNanosType]]; both
types exist so the
+ * NTZ/LTZ distinction propagates through the physical-type system to
consumers that need it.
+ *
+ * Ordering, compare, and hash are not implemented yet and will be added in a
follow-up issue.
+ */
+class PhysicalTimestampNTZNanosType() extends PhysicalDataType {
+ override private[sql] def ordering =
+ throw QueryExecutionErrors.orderedOperationUnsupportedByDataTypeError(
+ "PhysicalTimestampNTZNanosType")
+ override private[sql] type InternalType = TimestampNanosVal
+ @transient private[sql] lazy val tag = typeTag[InternalType]
+}
+case object PhysicalTimestampNTZNanosType extends PhysicalTimestampNTZNanosType
+
+/**
+ * Physical type for [[org.apache.spark.sql.types.TimestampLTZNanosType]].
Internal values are
+ * [[TimestampNanosVal]] (epoch micros + nanos within the micro). Stored in
[[UnsafeRow]] via a
+ * 16-byte variable-length payload; see
+ * [[org.apache.spark.sql.catalyst.expressions.TimestampNanosRowValues]].
+ *
+ * Storage layout is identical to [[PhysicalTimestampNTZNanosType]]; both
types exist so the
+ * NTZ/LTZ distinction propagates through the physical-type system to
consumers that need it.
+ *
+ * Ordering, compare, and hash are not implemented yet and will be added in a
follow-up issue.
+ */
+class PhysicalTimestampLTZNanosType() extends PhysicalDataType {
+ override private[sql] def ordering =
+ throw QueryExecutionErrors.orderedOperationUnsupportedByDataTypeError(
+ "PhysicalTimestampLTZNanosType")
+ override private[sql] type InternalType = TimestampNanosVal
+ @transient private[sql] lazy val tag = typeTag[InternalType]
+}
+case object PhysicalTimestampLTZNanosType extends PhysicalTimestampLTZNanosType
+
case class PhysicalDecimalType(precision: Int, scale: Int) extends
PhysicalFractionalType {
private[sql] type InternalType = Decimal
private[sql] val ordering = Decimal.DecimalIsFractional
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/GenericArrayData.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/GenericArrayData.scala
index 808a3d43bf20..98fd25c68388 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/GenericArrayData.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/GenericArrayData.scala
@@ -75,6 +75,8 @@ class GenericArrayData(val array: Array[Any]) extends
ArrayData {
override def getGeography(ordinal: Int): GeographyVal = getAs(ordinal)
override def getGeometry(ordinal: Int): GeometryVal = getAs(ordinal)
override def getInterval(ordinal: Int): CalendarInterval = getAs(ordinal)
+ override def getTimestampNTZNanos(ordinal: Int): TimestampNanosVal =
getAs(ordinal)
+ override def getTimestampLTZNanos(ordinal: Int): TimestampNanosVal =
getAs(ordinal)
override def getVariant(ordinal: Int): VariantVal = getAs(ordinal)
override def getStruct(ordinal: Int, numFields: Int): InternalRow =
getAs(ordinal)
override def getArray(ordinal: Int): ArrayData = getAs(ordinal)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/UnsafeRowUtils.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/UnsafeRowUtils.scala
index f2925314e2e2..4da9c88dd9a9 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/UnsafeRowUtils.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/UnsafeRowUtils.scala
@@ -173,6 +173,7 @@ object UnsafeRowUtils {
def avoidSetNullAt(dt: DataType): Boolean = dt match {
case t: DecimalType if t.precision > Decimal.MAX_LONG_DIGITS => true
case CalendarIntervalType => true
+ case _: TimestampNTZNanosType | _: TimestampLTZNanosType => true
case _ => false
}
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/LiteralExpressionSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/LiteralExpressionSuite.scala
index 6b642e874636..7391e9869a56 100644
---
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/LiteralExpressionSuite.scala
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/LiteralExpressionSuite.scala
@@ -36,7 +36,7 @@ import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.spark.sql.types.DayTimeIntervalType._
import org.apache.spark.sql.types.YearMonthIntervalType._
-import org.apache.spark.unsafe.types.{CalendarInterval, GeographyVal,
GeometryVal, UTF8String}
+import org.apache.spark.unsafe.types.{CalendarInterval, GeographyVal,
GeometryVal, TimestampNanosVal, UTF8String}
class LiteralExpressionSuite extends SparkFunSuite with ExpressionEvalHelper {
@@ -85,6 +85,10 @@ class LiteralExpressionSuite extends SparkFunSuite with
ExpressionEvalHelper {
}
checkEvaluation(Literal.default(TimeType()), LocalTime.MIDNIGHT)
checkEvaluation(Literal.default(CalendarIntervalType), new
CalendarInterval(0, 0, 0L))
+ checkEvaluation(
+ Literal.default(TimestampNTZNanosType(9)), new TimestampNanosVal(0L,
0.toShort))
+ checkEvaluation(
+ Literal.default(TimestampLTZNanosType(7)), new TimestampNanosVal(0L,
0.toShort))
checkEvaluation(Literal.default(YearMonthIntervalType()), 0)
checkEvaluation(Literal.default(DayTimeIntervalType()), 0L)
checkEvaluation(Literal.default(ArrayType(StringType)), Array())
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/TimestampNanosRowSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/TimestampNanosRowSuite.scala
new file mode 100644
index 000000000000..d85350504f7f
--- /dev/null
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/TimestampNanosRowSuite.scala
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.catalyst.InternalRow
+import
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
+import org.apache.spark.sql.catalyst.util.GenericArrayData
+import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.TimestampNanosVal
+import org.apache.spark.util.ArrayImplicits._
+
+class TimestampNanosRowSuite extends SparkFunSuite with ExpressionEvalHelper {
+
+ private val ntzValue = TimestampNanosVal.fromParts(1234567890123L,
42.toShort)
+ private val ltzValue = TimestampNanosVal.fromParts(9876543210987L,
999.toShort)
+
+ test("GenerateUnsafeProjection.canSupport for nanos timestamp types") {
+ assert(GenerateUnsafeProjection.canSupport(TimestampNTZNanosType(9)))
+ assert(GenerateUnsafeProjection.canSupport(TimestampLTZNanosType(7)))
+ }
+
+ test("GenericInternalRow roundtrip for TIMESTAMP_NTZ nanos") {
+ val row = new GenericInternalRow(Array[Any](ntzValue, null))
+ val accessor = InternalRow.getAccessor(TimestampNTZNanosType(9))
+ val writer = InternalRow.getWriter(0, TimestampNTZNanosType(9))
+ assert(accessor(row, 0) === ntzValue)
+ assert(accessor(row, 1) === null)
+
+ val row2 = new GenericInternalRow(Array[Any](null, null))
+ writer(row2, ntzValue)
+ assert(accessor(row2, 0) === ntzValue)
+ }
+
+ test("GenericInternalRow roundtrip for TIMESTAMP_LTZ nanos") {
+ val row = new GenericInternalRow(Array[Any](ltzValue, null))
+ val accessor = InternalRow.getAccessor(TimestampLTZNanosType(8))
+ val writer = InternalRow.getWriter(0, TimestampLTZNanosType(8))
+ assert(accessor(row, 0) === ltzValue)
+ assert(accessor(row, 1) === null)
+
+ val row2 = new GenericInternalRow(Array[Any](null, null))
+ writer(row2, ltzValue)
+ assert(accessor(row2, 0) === ltzValue)
+ }
+
+ testBothCodegenAndInterpreted("UnsafeRow roundtrip for nanos timestamp
columns") {
+ val schema = StructType(Seq(
+ StructField("ntz", TimestampNTZNanosType(9), nullable = true),
+ StructField("ltz", TimestampLTZNanosType(7), nullable = true)))
+ val fieldTypes = schema.map(_.dataType).toArray
+ val converter = UnsafeProjection.create(fieldTypes)
+
+ val input = new SpecificInternalRow(fieldTypes.toIndexedSeq)
+ input.update(0, ntzValue)
+ input.update(1, ltzValue)
+
+ val unsafeRow = converter.apply(input)
+ assert(unsafeRow.getTimestampNTZNanos(0) === ntzValue)
+ assert(unsafeRow.getTimestampLTZNanos(1) === ltzValue)
+
+ val updatedNtz = TimestampNanosVal.fromParts(1L, 0.toShort)
+ unsafeRow.setTimestampNTZNanos(0, updatedNtz)
+ assert(unsafeRow.getTimestampNTZNanos(0) === updatedNtz)
+
+ val offset = unsafeRow.getLong(0) >>> 32
+ unsafeRow.setTimestampNTZNanos(0, null)
+ assert(unsafeRow.getTimestampNTZNanos(0) === null)
+ assert(unsafeRow.getLong(0) >>> 32 === offset)
+ }
+
+ // Nanosecond timestamps use the UnsafeRow variable-length region, like
CalendarInterval.
+ // SPARK-41535 fixed null CalendarInterval columns not being marked null
when an
+ // UnsafeProjection buffer is first created from a row of null intervals (see
+ // MutableProjectionSuite and UnsafeRowConverterSuite). The same applies
here:
+ // nullAtCreation must have null bits set for every nanos column, and later
+ // setTimestampNTZNanos / setTimestampLTZNanos must work in place.
+ testBothCodegenAndInterpreted("nanos timestamps initialized as null in
unsafe projection") {
+ val fieldTypes: Array[DataType] =
+ Array(TimestampNTZNanosType(9), TimestampLTZNanosType(7))
+ val converter = UnsafeProjection.create(fieldTypes)
+
+ val row = new SpecificInternalRow(fieldTypes.toImmutableArraySeq)
+ row.setTimestampNTZNanos(0, null)
+ row.setTimestampLTZNanos(1, null)
+
+ val nullAtCreation = converter.apply(row)
+
+ for (i <- 0 until row.numFields) {
+ assert(nullAtCreation.isNullAt(i))
+ }
+
+ val ntz = TimestampNanosVal.fromParts(100L, 50.toShort)
+ val ltz = TimestampNanosVal.fromParts(200L, 100.toShort)
+ nullAtCreation.setTimestampNTZNanos(0, ntz)
+ nullAtCreation.setTimestampLTZNanos(1, ltz)
+ assert(nullAtCreation.getTimestampNTZNanos(0) === ntz)
+ assert(nullAtCreation.getTimestampLTZNanos(1) === ltz)
+ }
+
+ // Exercises UnsafeArrayWriter.write(int, TimestampNanosVal) via the codegen
path through
+ // GenerateUnsafeProjection.writeArrayToBuffer, mirroring the
CalendarInterval-array tests
+ // in UnsafeRowConverterSuite.
+ testBothCodegenAndInterpreted("UnsafeArrayWriter for nanos timestamp
arrays") {
+ val arrType = ArrayType(TimestampNTZNanosType(9), containsNull = true)
+ val converter = UnsafeProjection.create(Array[DataType](arrType))
+ val input = new GenericInternalRow(Array[Any](
+ new GenericArrayData(Array[Any](ntzValue, null, ntzValue))))
+ val output = converter.apply(input)
+ val arr = output.getArray(0)
+ assert(arr.numElements() == 3)
+ assert(arr.getTimestampNTZNanos(0) === ntzValue)
+ assert(arr.isNullAt(1))
+ assert(arr.getTimestampNTZNanos(2) === ntzValue)
+ }
+
+ testBothCodegenAndInterpreted("UnsafeArrayWriter for LTZ nanos timestamp
arrays") {
+ val arrType = ArrayType(TimestampLTZNanosType(7), containsNull = true)
+ val converter = UnsafeProjection.create(Array[DataType](arrType))
+ val input = new GenericInternalRow(Array[Any](
+ new GenericArrayData(Array[Any](ltzValue, null, ltzValue))))
+ val output = converter.apply(input)
+ val arr = output.getArray(0)
+ assert(arr.numElements() == 3)
+ assert(arr.getTimestampLTZNanos(0) === ltzValue)
+ assert(arr.isNullAt(1))
+ assert(arr.getTimestampLTZNanos(2) === ltzValue)
+ }
+
+ // containsNull = false exercises the codegen branch in writeArrayToBuffer
that elides
+ // the per-element isNullAt check.
+ testBothCodegenAndInterpreted("UnsafeArrayWriter for non-nullable nanos
timestamp arrays") {
+ val arrType = ArrayType(TimestampNTZNanosType(9), containsNull = false)
+ val converter = UnsafeProjection.create(Array[DataType](arrType))
+ val input = new GenericInternalRow(Array[Any](
+ new GenericArrayData(Array[Any](ntzValue, ntzValue))))
+ val output = converter.apply(input)
+ val arr = output.getArray(0)
+ assert(arr.numElements() == 2)
+ assert(arr.getTimestampNTZNanos(0) === ntzValue)
+ assert(arr.getTimestampNTZNanos(1) === ntzValue)
+ }
+
+ testBothCodegenAndInterpreted("codegen projection reads nanos timestamp
column") {
+ val boundRef = BoundReference(0, TimestampNTZNanosType(9), nullable =
false)
+ val projection = GenerateUnsafeProjection.generate(Seq(boundRef))
+ val input = new GenericInternalRow(Array[Any](ntzValue))
+ val output = projection.apply(input)
+ assert(output.getTimestampNTZNanos(0) === ntzValue)
+ }
+
+ test("literal validation for nanosecond timestamp types") {
+ Literal.validateLiteralValue(ntzValue, TimestampNTZNanosType(9))
+ Literal.validateLiteralValue(ltzValue, TimestampLTZNanosType(7))
+ // NTZ and LTZ share the same physical TimestampNanosVal; logical type is
schema metadata.
+ Literal.validateLiteralValue(ntzValue, TimestampLTZNanosType(7))
+ intercept[IllegalArgumentException] {
+ Literal.validateLiteralValue(0L, TimestampNTZNanosType(9))
+ }
+ }
+
+ test("checkEvaluation roundtrip for nanos timestamp Literal") {
+ checkEvaluation(Literal.create(ntzValue, TimestampNTZNanosType(9)),
ntzValue)
+ checkEvaluation(Literal.create(ltzValue, TimestampLTZNanosType(7)),
ltzValue)
+ }
+
+ test("null Literal of nanos timestamp type") {
+ checkEvaluation(Literal.create(null, TimestampNTZNanosType(9)), null)
+ checkEvaluation(Literal.create(null, TimestampLTZNanosType(7)), null)
+ }
+
+ testBothCodegenAndInterpreted("UnsafeRow handles extreme epoch micros for
nanos") {
+ val fieldTypes: Array[DataType] = Array(TimestampNTZNanosType(9))
+ val converter = UnsafeProjection.create(fieldTypes)
+ val input = new SpecificInternalRow(fieldTypes.toImmutableArraySeq)
+
+ val maxVal = TimestampNanosVal.fromParts(Long.MaxValue, 999.toShort)
+ input.update(0, maxVal)
+ val maxRow = converter.apply(input)
+ assert(maxRow.getTimestampNTZNanos(0) === maxVal)
+
+ val minVal = TimestampNanosVal.fromParts(Long.MinValue, 0.toShort)
+ input.update(0, minVal)
+ val minRow = converter.apply(input)
+ assert(minRow.getTimestampNTZNanos(0) === minVal)
+ }
+}
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/TimestampNanosRowValuesSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/TimestampNanosRowValuesSuite.scala
new file mode 100644
index 000000000000..3fc3817fdfe8
--- /dev/null
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/TimestampNanosRowValuesSuite.scala
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.unsafe.Platform
+
+/**
+ * Direct, byte-level tests for [[TimestampNanosRowValues]]. The roundtrip
tests in
+ * [[TimestampNanosRowSuite]] write and read through the same helper, so a
consistent
+ * encoder/decoder bug (swapped word order, wrong offset arithmetic) would
pass them.
+ * These tests pin the layout (word 0 = epoch micros, word 1 = nanos in the
low 16 bits with the
+ * upper 48 bits zero) against a raw byte buffer.
+ */
+class TimestampNanosRowValuesSuite extends SparkFunSuite {
+
+ private val BASE = Platform.BYTE_ARRAY_OFFSET
+
+ test("writePayload places epochMicros in word 0 and nanos in low 16 bits of
word 1") {
+ val buf = new Array[Byte](16)
+ TimestampNanosRowValues.writePayload(buf, BASE, 0, 0x0123456789ABCDEFL,
42.toShort)
+ assert(Platform.getLong(buf, BASE) === 0x0123456789ABCDEFL)
+ // The whole 8-byte word at offset 8 equals nanos: low 16 bits = 42, upper
48 bits = 0.
+ assert(Platform.getLong(buf, BASE + 8) === 42L)
+ }
+
+ test("readEpochMicros and readNanosWithinMicro decompose the payload") {
+ val buf = new Array[Byte](16)
+ TimestampNanosRowValues.writePayload(buf, BASE, 0, -1L, 999.toShort)
+ assert(TimestampNanosRowValues.readEpochMicros(buf, BASE, 0) === -1L)
+ assert(TimestampNanosRowValues.readNanosWithinMicro(buf, BASE, 0) ===
999.toShort)
+ }
+
+ test("readVal reconstructs the TimestampNanosVal") {
+ val buf = new Array[Byte](16)
+ TimestampNanosRowValues.writePayload(buf, BASE, 0, 1234567890123L,
500.toShort)
+ val v = TimestampNanosRowValues.readVal(buf, BASE, 0)
+ assert(v.epochMicros === 1234567890123L)
+ assert(v.nanosWithinMicro === 500.toShort)
+ }
+
+ test("zeroPayload clears both words") {
+ val buf = new Array[Byte](16)
+ TimestampNanosRowValues.writePayload(buf, BASE, 0, -1L, 42.toShort)
+ TimestampNanosRowValues.zeroPayload(buf, BASE, 0)
+ assert(Platform.getLong(buf, BASE) === 0L)
+ assert(Platform.getLong(buf, BASE + 8) === 0L)
+ }
+
+ test("writePayload accepts boundary nanos values 0 and 999") {
+ val buf = new Array[Byte](16)
+ TimestampNanosRowValues.writePayload(buf, BASE, 0, 0L, 0.toShort)
+ assert(TimestampNanosRowValues.readNanosWithinMicro(buf, BASE, 0) ===
0.toShort)
+ TimestampNanosRowValues.writePayload(buf, BASE, 0, 0L, 999.toShort)
+ assert(TimestampNanosRowValues.readNanosWithinMicro(buf, BASE, 0) ===
999.toShort)
+ }
+
+ test("writePayload preserves Long.MinValue and Long.MaxValue epoch micros") {
+ val buf = new Array[Byte](16)
+ TimestampNanosRowValues.writePayload(buf, BASE, 0, Long.MaxValue,
999.toShort)
+ assert(TimestampNanosRowValues.readEpochMicros(buf, BASE, 0) ===
Long.MaxValue)
+ assert(TimestampNanosRowValues.readNanosWithinMicro(buf, BASE, 0) ===
999.toShort)
+
+ TimestampNanosRowValues.writePayload(buf, BASE, 0, Long.MinValue,
0.toShort)
+ assert(TimestampNanosRowValues.readEpochMicros(buf, BASE, 0) ===
Long.MinValue)
+ assert(TimestampNanosRowValues.readNanosWithinMicro(buf, BASE, 0) ===
0.toShort)
+ }
+
+ test("writePayload honours the cursor offset") {
+ // Two payloads back-to-back; reads at the corresponding cursors must not
see each other.
+ val buf = new Array[Byte](32)
+ TimestampNanosRowValues.writePayload(buf, BASE, 0, 111L, 1.toShort)
+ TimestampNanosRowValues.writePayload(buf, BASE, 16, 222L, 2.toShort)
+ assert(TimestampNanosRowValues.readEpochMicros(buf, BASE, 0) === 111L)
+ assert(TimestampNanosRowValues.readNanosWithinMicro(buf, BASE, 0) ===
1.toShort)
+ assert(TimestampNanosRowValues.readEpochMicros(buf, BASE, 16) === 222L)
+ assert(TimestampNanosRowValues.readNanosWithinMicro(buf, BASE, 16) ===
2.toShort)
+ }
+}
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjectionSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjectionSuite.scala
index 9c0d610f35f6..e4b875d222c3 100644
---
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjectionSuite.scala
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjectionSuite.scala
@@ -90,6 +90,8 @@ object AlwaysNull extends InternalRow {
override def getGeography(ordinal: Int): GeographyVal = notSupported
override def getGeometry(ordinal: Int): GeometryVal = notSupported
override def getInterval(ordinal: Int): CalendarInterval = notSupported
+ override def getTimestampNTZNanos(ordinal: Int): TimestampNanosVal =
notSupported
+ override def getTimestampLTZNanos(ordinal: Int): TimestampNanosVal =
notSupported
override def getVariant(ordinal: Int): VariantVal = notSupported
override def getStruct(ordinal: Int, numFields: Int): InternalRow =
notSupported
override def getArray(ordinal: Int): ArrayData = notSupported
@@ -122,6 +124,8 @@ object AlwaysNonNull extends InternalRow {
override def getGeography(ordinal: Int): GeographyVal = notSupported
override def getGeometry(ordinal: Int): GeometryVal = notSupported
override def getInterval(ordinal: Int): CalendarInterval = notSupported
+ override def getTimestampNTZNanos(ordinal: Int): TimestampNanosVal =
notSupported
+ override def getTimestampLTZNanos(ordinal: Int): TimestampNanosVal =
notSupported
override def getVariant(ordinal: Int): VariantVal = notSupported
override def getStruct(ordinal: Int, numFields: Int): InternalRow =
notSupported
override def getArray(ordinal: Int): ArrayData =
stringToUTF8Array(Array("1", "2", "3"))
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/types/DataTypeSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/types/DataTypeSuite.scala
index ad09b7411e65..f0c965f29889 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/types/DataTypeSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/types/DataTypeSuite.scala
@@ -426,6 +426,15 @@ class DataTypeSuite extends SparkFunSuite with SQLHelper {
checkDefaultSize(TimestampNTZNanosType(TimestampNTZNanosType.MIN_PRECISION),
10)
checkDefaultSize(TimestampNTZNanosType(TimestampNTZNanosType.MAX_PRECISION),
10)
+ test("PhysicalDataType for nanosecond timestamp types") {
+ for (p <- TimestampNTZNanosType.MIN_PRECISION to
TimestampNTZNanosType.MAX_PRECISION) {
+ assert(PhysicalDataType(TimestampNTZNanosType(p)) !=
UninitializedPhysicalType)
+ }
+ for (p <- TimestampLTZNanosType.MIN_PRECISION to
TimestampLTZNanosType.MAX_PRECISION) {
+ assert(PhysicalDataType(TimestampLTZNanosType(p)) !=
UninitializedPhysicalType)
+ }
+ }
+
def checkEqualsIgnoreCompatibleNullability(
from: DataType,
to: DataType,
diff --git
a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/MutableColumnarRow.java
b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/MutableColumnarRow.java
index a46b5143eef6..b06dc50d9018 100644
---
a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/MutableColumnarRow.java
+++
b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/MutableColumnarRow.java
@@ -29,6 +29,7 @@ import org.apache.spark.sql.vectorized.ColumnarBatch;
import org.apache.spark.sql.vectorized.ColumnarMap;
import org.apache.spark.sql.vectorized.ColumnarRow;
import org.apache.spark.unsafe.types.CalendarInterval;
+import org.apache.spark.unsafe.types.TimestampNanosVal;
import org.apache.spark.unsafe.types.GeographyVal;
import org.apache.spark.unsafe.types.GeometryVal;
import org.apache.spark.unsafe.types.UTF8String;
@@ -165,6 +166,16 @@ public final class MutableColumnarRow extends InternalRow {
return columns[ordinal].getInterval(rowId);
}
+ @Override
+ public TimestampNanosVal getTimestampNTZNanos(int ordinal) {
+ return columns[ordinal].getTimestampNTZNanos(rowId);
+ }
+
+ @Override
+ public TimestampNanosVal getTimestampLTZNanos(int ordinal) {
+ return columns[ordinal].getTimestampLTZNanos(rowId);
+ }
+
@Override
public VariantVal getVariant(int ordinal) {
return columns[ordinal].getVariant(rowId);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]