This is an automated email from the ASF dual-hosted git repository.
maxgekk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new a7f3b5f2db08 [SPARK-52460][SQL] Store internal TIME values as
nanoseconds
a7f3b5f2db08 is described below
commit a7f3b5f2db087ed8d7b5f35c5c8111d528d24da3
Author: Max Gekk <[email protected]>
AuthorDate: Sat Jun 14 23:23:57 2025 +0300
[SPARK-52460][SQL] Store internal TIME values as nanoseconds
### What changes were proposed in this pull request?
In the PR, I propose to store internal TIME values as nanoseconds in `Long`
since the midnight instead of microseconds in `Long`.
### Why are the changes needed?
`Long` with nanoseconds precision can keep the full range of TIME values
from 0 to 24 * 60 * 60 * 1000 * 1000 * 1000 - 1 which is less than maximum of
Long. This will simplify support of `TIME(9)`, for example.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
By running the affected test suites:
```
$ build/sbt "sql/testOnly org.apache.spark.sql.SQLQueryTestSuite"
$ build/sbt "test:testOnly *TimeExpressionsSuite"
$ build/sbt "test:testOnly *TimeFormatterSuite"
```
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #51156 from MaxGekk/time-nanos.
Authored-by: Max Gekk <[email protected]>
Signed-off-by: Max Gekk <[email protected]>
---
.../sql/catalyst/util/SparkDateTimeUtils.scala | 27 ++++++----
.../spark/sql/catalyst/util/TimeFormatter.scala | 12 ++---
.../sql/catalyst/CatalystTypeConverters.scala | 6 +--
.../sql/catalyst/DeserializerBuildHelper.scala | 2 +-
.../spark/sql/catalyst/SerializerBuildHelper.scala | 2 +-
.../spark/sql/catalyst/expressions/literals.scala | 4 +-
.../spark/sql/catalyst/util/DateTimeUtils.scala | 18 +++----
.../org/apache/spark/sql/RandomDataGenerator.scala | 2 +-
.../sql/catalyst/CatalystTypeConvertersSuite.scala | 4 +-
.../sql/catalyst/encoders/RowEncoderSuite.scala | 2 +-
.../expressions/HashExpressionsSuite.scala | 6 +--
.../catalyst/expressions/LiteralGenerator.scala | 4 +-
.../catalyst/expressions/ToPrettyStringSuite.scala | 6 +--
.../sql/catalyst/util/DateTimeTestUtils.scala | 6 +--
.../sql/catalyst/util/DateTimeUtilsSuite.scala | 24 ++++-----
.../sql/catalyst/util/TimeFormatterSuite.scala | 57 ++++++++++++----------
.../parquet/ParquetVectorUpdaterFactory.java | 44 ++++++++++++++++-
.../parquet/VectorizedColumnReader.java | 3 +-
.../datasources/parquet/ParquetFilters.scala | 23 ++++-----
.../datasources/parquet/ParquetRowConverter.scala | 3 +-
.../datasources/parquet/ParquetWriteSupport.scala | 6 ++-
.../datasources/parquet/ParquetIOSuite.scala | 4 +-
22 files changed, 165 insertions(+), 100 deletions(-)
diff --git
a/sql/api/src/main/scala/org/apache/spark/sql/catalyst/util/SparkDateTimeUtils.scala
b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/util/SparkDateTimeUtils.scala
index 6a51799e1132..b88600b8b8af 100644
---
a/sql/api/src/main/scala/org/apache/spark/sql/catalyst/util/SparkDateTimeUtils.scala
+++
b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/util/SparkDateTimeUtils.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst.util
import java.lang.invoke.{MethodHandles, MethodType}
import java.sql.{Date, Timestamp}
import java.time.{Instant, LocalDate, LocalDateTime, LocalTime, ZonedDateTime,
ZoneId, ZoneOffset}
-import java.time.temporal.ChronoField.MICRO_OF_DAY
+import java.time.temporal.ChronoField.{MICRO_OF_DAY, NANO_OF_DAY}
import java.util.TimeZone
import java.util.concurrent.TimeUnit.{MICROSECONDS, NANOSECONDS}
import java.util.regex.Pattern
@@ -83,6 +83,12 @@ trait SparkDateTimeUtils {
case ldt: LocalDateTime => localDateTimeToMicros(ldt)
}
+ /**
+ * Converts the time to microseconds since midnight. In Spark time values
have nanoseconds
+ * precision, so this conversion is lossy.
+ */
+ def nanosToMicros(nanos: Long): Long = Math.floorDiv(nanos,
MICROS_PER_MILLIS)
+
/**
* Converts the timestamp to milliseconds since epoch. In Spark timestamp
values have
* microseconds precision, so this conversion is lossy.
@@ -101,6 +107,11 @@ trait SparkDateTimeUtils {
Math.multiplyExact(millis, MICROS_PER_MILLIS)
}
+ /**
+ * Converts microseconds since the midnight to nanoseconds.
+ */
+ def microsToNanos(micros: Long): Long = Math.multiplyExact(micros,
NANOS_PER_MICROS)
+
// See issue SPARK-35679
// min second cause overflow in instant to micro
private val MIN_SECONDS = Math.floorDiv(Long.MinValue, MICROS_PER_SECOND)
@@ -225,17 +236,15 @@ trait SparkDateTimeUtils {
}
/**
- * Converts the local time to the number of microseconds within the day,
from 0 to (24 * 60 * 60
- * * 1000000) - 1.
+ * Converts the local time to the number of nanoseconds within the day, from
0 to (24 * 60 * 60
+ * * 1000 * 1000 * 1000) - 1.
*/
- def localTimeToMicros(localTime: LocalTime): Long =
localTime.getLong(MICRO_OF_DAY)
+ def localTimeToNanos(localTime: LocalTime): Long =
localTime.getLong(NANO_OF_DAY)
/**
- * Converts the number of microseconds within the day to the local time.
+ * Converts the number of nanoseconds within the day to the local time.
*/
- def microsToLocalTime(micros: Long): LocalTime = {
- LocalTime.ofNanoOfDay(Math.multiplyExact(micros, NANOS_PER_MICROS))
- }
+ def nanosToLocalTime(nanos: Long): LocalTime = LocalTime.ofNanoOfDay(nanos)
/**
* Converts a local date at the default JVM time zone to the number of days
since 1970-01-01 in
@@ -716,7 +725,7 @@ trait SparkDateTimeUtils {
}
val nanoseconds = MICROSECONDS.toNanos(segments(6))
val localTime = LocalTime.of(segments(3), segments(4), segments(5),
nanoseconds.toInt)
- Some(localTimeToMicros(localTime))
+ Some(localTimeToNanos(localTime))
} catch {
case NonFatal(_) => None
}
diff --git
a/sql/api/src/main/scala/org/apache/spark/sql/catalyst/util/TimeFormatter.scala
b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/util/TimeFormatter.scala
index 46afbc8aca19..a159f74572f1 100644
---
a/sql/api/src/main/scala/org/apache/spark/sql/catalyst/util/TimeFormatter.scala
+++
b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/util/TimeFormatter.scala
@@ -25,11 +25,11 @@ import
org.apache.spark.sql.catalyst.util.SparkDateTimeUtils._
import org.apache.spark.unsafe.types.UTF8String
sealed trait TimeFormatter extends Serializable {
- def parse(s: String): Long // returns microseconds since midnight
+ def parse(s: String): Long // returns nanoseconds since midnight
def format(localTime: LocalTime): String
- // Converts microseconds since the midnight to time string
- def format(micros: Long): String
+ // Converts nanoseconds since the midnight to time string
+ def format(nanos: Long): String
def validatePatternString(): Unit
}
@@ -47,15 +47,15 @@ class Iso8601TimeFormatter(pattern: String, locale: Locale,
isParsing: Boolean)
override def parse(s: String): Long = {
val localTime = toLocalTime(formatter.parse(s))
- localTimeToMicros(localTime)
+ localTimeToNanos(localTime)
}
override def format(localTime: LocalTime): String = {
localTime.format(formatter)
}
- override def format(micros: Long): String = {
- format(microsToLocalTime(micros))
+ override def format(nanos: Long): String = {
+ format(nanosToLocalTime(nanos))
}
override def validatePatternString(): Unit = {
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala
index bb6afb3b13fa..440cfb713242 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala
@@ -375,14 +375,14 @@ object CatalystTypeConverters {
private object TimeConverter extends CatalystTypeConverter[LocalTime,
LocalTime, Any] {
override def toCatalystImpl(scalaValue: LocalTime): Long = {
- DateTimeUtils.localTimeToMicros(scalaValue)
+ DateTimeUtils.localTimeToNanos(scalaValue)
}
override def toScala(catalystValue: Any): LocalTime = {
if (catalystValue == null) null
- else DateTimeUtils.microsToLocalTime(catalystValue.asInstanceOf[Long])
+ else DateTimeUtils.nanosToLocalTime(catalystValue.asInstanceOf[Long])
}
override def toScalaImpl(row: InternalRow, column: Int): LocalTime =
- DateTimeUtils.microsToLocalTime(row.getLong(column))
+ DateTimeUtils.nanosToLocalTime(row.getLong(column))
}
private object TimestampConverter extends CatalystTypeConverter[Any,
Timestamp, Any] {
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/DeserializerBuildHelper.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/DeserializerBuildHelper.scala
index 9b22f28ed12d..15de70e35a45 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/DeserializerBuildHelper.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/DeserializerBuildHelper.scala
@@ -160,7 +160,7 @@ object DeserializerBuildHelper {
StaticInvoke(
DateTimeUtils.getClass,
ObjectType(classOf[java.time.LocalTime]),
- "microsToLocalTime",
+ "nanosToLocalTime",
path :: Nil,
returnNullable = false)
}
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SerializerBuildHelper.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SerializerBuildHelper.scala
index c8bf1f523799..82b3cdc508bf 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SerializerBuildHelper.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SerializerBuildHelper.scala
@@ -103,7 +103,7 @@ object SerializerBuildHelper {
StaticInvoke(
DateTimeUtils.getClass,
TimeType(),
- "localTimeToMicros",
+ "localTimeToNanos",
inputObject :: Nil,
returnNullable = false)
}
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala
index e3ed2c4a0b0b..925735654b73 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala
@@ -49,7 +49,7 @@ import org.apache.spark.sql.catalyst.trees.TreePattern
import org.apache.spark.sql.catalyst.trees.TreePattern.{LITERAL, NULL_LITERAL,
TRUE_OR_FALSE_LITERAL}
import org.apache.spark.sql.catalyst.types._
import org.apache.spark.sql.catalyst.util._
-import org.apache.spark.sql.catalyst.util.DateTimeUtils.{instantToMicros,
localTimeToMicros}
+import org.apache.spark.sql.catalyst.util.DateTimeUtils.{instantToMicros,
localTimeToNanos}
import org.apache.spark.sql.catalyst.util.IntervalStringStyles.ANSI_STYLE
import org.apache.spark.sql.catalyst.util.IntervalUtils.{durationToMicros,
periodToMonths, toDayTimeIntervalString, toYearMonthIntervalString}
import org.apache.spark.sql.errors.{QueryCompilationErrors,
QueryExecutionErrors}
@@ -89,7 +89,7 @@ object Literal {
case l: LocalDateTime => Literal(DateTimeUtils.localDateTimeToMicros(l),
TimestampNTZType)
case ld: LocalDate => Literal(ld.toEpochDay.toInt, DateType)
case d: Date => Literal(DateTimeUtils.fromJavaDate(d), DateType)
- case lt: LocalTime => Literal(localTimeToMicros(lt), TimeType())
+ case lt: LocalTime => Literal(localTimeToNanos(lt), TimeType())
case d: Duration => Literal(durationToMicros(d), DayTimeIntervalType())
case p: Period => Literal(periodToMonths(p), YearMonthIntervalType())
case a: Array[Byte] => Literal(a, BinaryType)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
index d7cbb9886ba1..cd811bc9749f 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
@@ -109,7 +109,7 @@ object DateTimeUtils extends SparkDateTimeUtils {
* Returns the hour value of a given TIME (TimeType) value.
*/
def getHoursOfTime(micros: Long): Int = {
- microsToLocalTime(micros).getHour
+ nanosToLocalTime(micros).getHour
}
/**
@@ -124,7 +124,7 @@ object DateTimeUtils extends SparkDateTimeUtils {
* Returns the minute value of a given TIME (TimeType) value.
*/
def getMinutesOfTime(micros: Long): Int = {
- microsToLocalTime(micros).getMinute
+ nanosToLocalTime(micros).getMinute
}
/**
@@ -139,7 +139,7 @@ object DateTimeUtils extends SparkDateTimeUtils {
* Returns the second value of a given TIME (TimeType) value.
*/
def getSecondsOfTime(micros: Long): Int = {
- microsToLocalTime(micros).getSecond
+ nanosToLocalTime(micros).getSecond
}
/**
* Returns the seconds part and its fractional part with microseconds.
@@ -151,16 +151,16 @@ object DateTimeUtils extends SparkDateTimeUtils {
/**
* Returns the second value with fraction from a given TIME (TimeType) value.
- * @param micros
- * The number of microseconds since the epoch.
+ * @param nanos
+ * The number of nanoseconds since the epoch.
* @param precision
* The time fractional seconds precision, which indicates the number of
decimal digits
* maintained.
*/
- def getSecondsOfTimeWithFraction(micros: Long, precision: Int): Decimal = {
- val seconds = (micros / MICROS_PER_SECOND) % SECONDS_PER_MINUTE
+ def getSecondsOfTimeWithFraction(nanos: Long, precision: Int): Decimal = {
+ val seconds = (nanos / NANOS_PER_SECOND) % SECONDS_PER_MINUTE
val scaleFactor = math.pow(10, precision).toLong
- val scaledFraction = (micros % MICROS_PER_SECOND) * scaleFactor /
MICROS_PER_SECOND
+ val scaledFraction = (nanos % NANOS_PER_SECOND) * scaleFactor /
NANOS_PER_SECOND
val fraction = scaledFraction.toDouble / scaleFactor
Decimal(seconds + fraction, 8, 6)
}
@@ -816,7 +816,7 @@ object DateTimeUtils extends SparkDateTimeUtils {
val nanos = Math.floorMod(unscaledSecFrac, MICROS_PER_SECOND) *
NANOS_PER_MICROS
val lt = LocalTime.of(hours, minutes, fullSecs.toInt, nanos.toInt)
- localTimeToMicros(lt)
+ localTimeToNanos(lt)
} catch {
case e @ (_: DateTimeException | _: ArithmeticException) =>
throw
QueryExecutionErrors.ansiDateTimeArgumentOutOfRangeWithoutSuggestion(e)
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/RandomDataGenerator.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/RandomDataGenerator.scala
index 2f92fe3d083d..de916d13b1ba 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/RandomDataGenerator.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/RandomDataGenerator.scala
@@ -292,7 +292,7 @@ object RandomDataGenerator {
randomNumeric[LocalTime](
rand,
(rand: Random) => {
- DateTimeUtils.microsToLocalTime(rand.between(0, 24 * 60 * 60 *
1000 * 1000L))
+ DateTimeUtils.nanosToLocalTime(rand.between(0, 24 * 60 * 60 * 1000
* 1000L))
},
specialTimes.map(LocalTime.parse)
)
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/CatalystTypeConvertersSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/CatalystTypeConvertersSuite.scala
index e4c48f7467f9..00d8bd6633a9 100644
---
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/CatalystTypeConvertersSuite.scala
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/CatalystTypeConvertersSuite.scala
@@ -435,7 +435,7 @@ class CatalystTypeConvertersSuite extends SparkFunSuite
with SQLHelper {
"23:59:59.999999").foreach { time =>
val input = LocalTime.parse(time)
val result = CatalystTypeConverters.convertToCatalyst(input)
- val expected = DateTimeUtils.localTimeToMicros(input)
+ val expected = DateTimeUtils.localTimeToNanos(input)
assert(result === expected)
}
}
@@ -449,7 +449,7 @@ class CatalystTypeConvertersSuite extends SparkFunSuite
with SQLHelper {
43200999999L,
86399000000L,
86399999999L).foreach { us =>
- val localTime = DateTimeUtils.microsToLocalTime(us)
+ val localTime = DateTimeUtils.nanosToLocalTime(us)
assert(CatalystTypeConverters.createToScalaConverter(TimeType())(us) ===
localTime)
}
}
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/RowEncoderSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/RowEncoderSuite.scala
index 1609e1a4e113..05ccaf2cbda0 100644
---
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/RowEncoderSuite.scala
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/RowEncoderSuite.scala
@@ -377,7 +377,7 @@ class RowEncoderSuite extends CodegenInterpretedPlanTest {
val encoder = ExpressionEncoder(schema).resolveAndBind()
val localTime = java.time.LocalTime.parse("20:38:45.123456")
val row = toRow(encoder, Row(localTime))
- assert(row.getLong(0) === DateTimeUtils.localTimeToMicros(localTime))
+ assert(row.getLong(0) === DateTimeUtils.localTimeToNanos(localTime))
val readback = fromRow(encoder, row)
assert(readback.get(0).equals(localTime))
}
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/HashExpressionsSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/HashExpressionsSuite.scala
index dddc33aa4358..c64b94703288 100644
---
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/HashExpressionsSuite.scala
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/HashExpressionsSuite.scala
@@ -756,9 +756,9 @@ class HashExpressionsSuite extends SparkFunSuite with
ExpressionEvalHelper {
test("Support TimeType") {
val time = Literal.create(LocalTime.of(23, 50, 59, 123456000), TimeType())
- checkEvaluation(Murmur3Hash(Seq(time), 10), 258472763)
- checkEvaluation(XxHash64(Seq(time), 10), -9197489935839400467L)
- checkEvaluation(HiveHash(Seq(time)), -40222445)
+ checkEvaluation(Murmur3Hash(Seq(time), 10), 545499634)
+ checkEvaluation(XxHash64(Seq(time), 10), -3550518982366774761L)
+ checkEvaluation(HiveHash(Seq(time)), -1567775210)
}
private def testHash(inputSchema: StructType): Unit = {
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/LiteralGenerator.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/LiteralGenerator.scala
index ed5843478c00..bada54135653 100644
---
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/LiteralGenerator.scala
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/LiteralGenerator.scala
@@ -125,8 +125,8 @@ object LiteralGenerator {
lazy val timeLiteralGen: Gen[Literal] = {
// Valid range for TimeType is [00:00:00, 23:59:59.999999]
- val minTime = DateTimeUtils.localTimeToMicros(LocalTime.MIN)
- val maxTime = DateTimeUtils.localTimeToMicros(LocalTime.MAX)
+ val minTime = DateTimeUtils.localTimeToNanos(LocalTime.MIN)
+ val maxTime = DateTimeUtils.localTimeToNanos(LocalTime.MAX)
for { t <- Gen.choose(minTime, maxTime) }
yield Literal(t, TimeType())
}
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ToPrettyStringSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ToPrettyStringSuite.scala
index 5c297c00acc0..6a2651edd9ab 100644
---
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ToPrettyStringSuite.scala
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ToPrettyStringSuite.scala
@@ -136,9 +136,9 @@ class ToPrettyStringSuite extends SparkFunSuite with
ExpressionEvalHelper {
}
test("Time as pretty strings") {
- checkEvaluation(ToPrettyString(Literal(1000L, TimeType())), "00:00:00.001")
- checkEvaluation(ToPrettyString(Literal(1L, TimeType())), "00:00:00.000001")
+ checkEvaluation(ToPrettyString(Literal(1000 * 1000L, TimeType())),
"00:00:00.001")
+ checkEvaluation(ToPrettyString(Literal(1000L, TimeType())),
"00:00:00.000001")
checkEvaluation(ToPrettyString(Literal(
- (23 * 3600 + 59 * 60 + 59) * 1000000L, TimeType())), "23:59:59")
+ (23 * 3600 + 59 * 60 + 59) * 1000000000L, TimeType())), "23:59:59")
}
}
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeTestUtils.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeTestUtils.scala
index f281c42bbe71..b17a22778801 100644
---
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeTestUtils.scala
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeTestUtils.scala
@@ -24,7 +24,7 @@ import java.util.concurrent.TimeUnit
import scala.jdk.CollectionConverters._
import org.apache.spark.sql.catalyst.util.DateTimeConstants._
-import org.apache.spark.sql.catalyst.util.DateTimeUtils.{getZoneId,
localTimeToMicros}
+import org.apache.spark.sql.catalyst.util.DateTimeUtils.{getZoneId,
localTimeToNanos}
/**
* Helper functions for testing date and time functionality.
@@ -113,7 +113,7 @@ object DateTimeTestUtils {
result
}
- // Returns microseconds since midnight
+ // Returns nanoseconds since midnight
def localTime(
hour: Byte = 0,
minute: Byte = 0,
@@ -121,6 +121,6 @@ object DateTimeTestUtils {
micros: Int = 0): Long = {
val nanos = TimeUnit.MICROSECONDS.toNanos(micros).toInt
val localTime = LocalTime.of(hour, minute, sec, nanos)
- localTimeToMicros(localTime)
+ localTimeToNanos(localTime)
}
}
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala
index 24258a2268ba..0307b6d944fb 100644
---
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala
@@ -1107,25 +1107,27 @@ class DateTimeUtilsSuite extends SparkFunSuite with
Matchers with SQLHelper {
"invalidValue" -> "'SECS'"))
}
- test("localTimeToMicros and microsToLocalTime") {
- assert(microsToLocalTime(0) === LocalTime.of(0, 0))
- assert(localTimeToMicros(LocalTime.of(0, 0)) === 0)
+ test("localTimeToNanos and nanosToLocalTime") {
+ assert(nanosToLocalTime(0) === LocalTime.of(0, 0))
+ assert(localTimeToNanos(LocalTime.of(0, 0)) === 0)
- assert(localTimeToMicros(microsToLocalTime(123456789)) === 123456789)
+ assert(localTimeToNanos(nanosToLocalTime(123456789123L)) === 123456789123L)
- assert(localTimeToMicros(LocalTime.parse("23:59:59.999999")) === (24L * 60
* 60 * 1000000 - 1))
- assert(microsToLocalTime(24L * 60 * 60 * 1000000 - 1) === LocalTime.of(23,
59, 59, 999999000))
+ assert(localTimeToNanos(LocalTime.parse("23:59:59.999999999")) ===
+ (24L * 60 * 60 * 1000 * 1000 * 1000 - 1))
+ assert(nanosToLocalTime(24L * 60 * 60 * 1000 * 1000 * 1000 - 1) ===
+ LocalTime.of(23, 59, 59, 999999999))
- Seq(-1, 24L * 60 * 60 * 1000000).foreach { invalidMicros =>
+ Seq(-1, 24L * 60 * 60 * 1000 * 1000 * 1000L).foreach { invalidMicros =>
val msg = intercept[DateTimeException] {
- microsToLocalTime(invalidMicros)
+ nanosToLocalTime(invalidMicros)
}.getMessage
assert(msg.contains("Invalid value"))
}
- val msg = intercept[ArithmeticException] {
- microsToLocalTime(Long.MaxValue)
+ val msg = intercept[DateTimeException] {
+ nanosToLocalTime(Long.MaxValue)
}.getMessage
- assert(msg == "long overflow")
+ assert(msg.contains("Invalid value"))
}
test("stringToTime") {
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/TimeFormatterSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/TimeFormatterSuite.scala
index d99ea2bd1042..9a707d80d248 100644
---
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/TimeFormatterSuite.scala
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/TimeFormatterSuite.scala
@@ -24,22 +24,24 @@ import scala.util.Random
import org.apache.spark.{SPARK_DOC_ROOT, SparkDateTimeException,
SparkFunSuite, SparkRuntimeException}
import org.apache.spark.sql.catalyst.plans.SQLHelper
import org.apache.spark.sql.catalyst.util.DateTimeTestUtils._
-import org.apache.spark.sql.catalyst.util.DateTimeUtils.microsToLocalTime
+import org.apache.spark.sql.catalyst.util.DateTimeUtils.nanosToLocalTime
class TimeFormatterSuite extends SparkFunSuite with SQLHelper {
test("time parsing") {
Seq(
- ("12", "HH") -> 12 * 3600 * 1000000L,
- ("01:02", "HH:mm") -> (1 * 3600 + 2 * 60) * 1000000L,
- ("10:20", "HH:mm") -> (10 * 3600 + 20 * 60) * 1000000L,
+ ("12", "HH") -> 12 * 3600 * 1000000000L,
+ ("01:02", "HH:mm") -> (1 * 3600 + 2 * 60) * 1000000000L,
+ ("10:20", "HH:mm") -> (10 * 3600 + 20 * 60) * 1000000000L,
("00:00:00", "HH:mm:ss") -> 0L,
- ("01:02:03", "HH:mm:ss") -> (1 * 3600 + 2 * 60 + 3) * 1000000L,
- ("23:59:59", "HH:mm:ss") -> (23 * 3600 + 59 * 60 + 59) * 1000000L,
+ ("01:02:03", "HH:mm:ss") -> (1 * 3600 + 2 * 60 + 3) * 1000000000L,
+ ("23:59:59", "HH:mm:ss") -> (23 * 3600 + 59 * 60 + 59) * 1000000000L,
("00:00:00.000000", "HH:mm:ss.SSSSSS") -> 0L,
- ("12:34:56.789012", "HH:mm:ss.SSSSSS") -> ((12 * 3600 + 34 * 60 + 56) *
1000000L + 789012),
- ("23:59:59.000000", "HH:mm:ss.SSSSSS") -> (23 * 3600 + 59 * 60 + 59) *
1000000L,
- ("23:59:59.999999", "HH:mm:ss.SSSSSS") -> ((23 * 3600 + 59 * 60 + 59) *
1000000L + 999999)
+ ("12:34:56.789012", "HH:mm:ss.SSSSSS") ->
+ ((12 * 3600 + 34 * 60 + 56) * 1000000000L + 789012000),
+ ("23:59:59.000000", "HH:mm:ss.SSSSSS") -> (23 * 3600 + 59 * 60 + 59) *
1000000000L,
+ ("23:59:59.999999", "HH:mm:ss.SSSSSS") ->
+ ((23 * 3600 + 59 * 60 + 59) * 1000000000L + 999999000)
).foreach { case ((inputStr, pattern), expectedMicros) =>
val formatter = TimeFormatter(format = pattern, isParsing = true)
assert(formatter.parse(inputStr) === expectedMicros)
@@ -60,16 +62,18 @@ class TimeFormatterSuite extends SparkFunSuite with
SQLHelper {
test("time formatting") {
Seq(
- (12 * 3600 * 1000000L, "HH") -> "12",
- ((1 * 3600 + 2 * 60) * 1000000L, "HH:mm") -> "01:02",
- ((10 * 3600 + 20 * 60) * 1000000L, "HH:mm") -> "10:20",
+ (12 * 3600 * 1000000000L, "HH") -> "12",
+ ((1 * 3600 + 2 * 60) * 1000000000L, "HH:mm") -> "01:02",
+ ((10 * 3600 + 20 * 60) * 1000000000L, "HH:mm") -> "10:20",
(0L, "HH:mm:ss") -> "00:00:00",
- ((1 * 3600 + 2 * 60 + 3) * 1000000L, "HH:mm:ss") -> "01:02:03",
- ((23 * 3600 + 59 * 60 + 59) * 1000000L, "HH:mm:ss") -> "23:59:59",
+ ((1 * 3600 + 2 * 60 + 3) * 1000000000L, "HH:mm:ss") -> "01:02:03",
+ ((23 * 3600 + 59 * 60 + 59) * 1000000000L, "HH:mm:ss") -> "23:59:59",
(0L, "HH:mm:ss.SSSSSS") -> "00:00:00.000000",
- ((12 * 3600 + 34 * 60 + 56) * 1000000L + 789012, "HH:mm:ss.SSSSSS") ->
"12:34:56.789012",
- ((23 * 3600 + 59 * 60 + 59) * 1000000L, "HH:mm:ss.SSSSSS") ->
"23:59:59.000000",
- ((23 * 3600 + 59 * 60 + 59) * 1000000L + 999999, "HH:mm:ss.SSSSSS") ->
"23:59:59.999999"
+ ((12 * 3600 + 34 * 60 + 56) * 1000000000L + 789012000,
"HH:mm:ss.SSSSSS") ->
+ "12:34:56.789012",
+ ((23 * 3600 + 59 * 60 + 59) * 1000000000L, "HH:mm:ss.SSSSSS") ->
"23:59:59.000000",
+ ((23 * 3600 + 59 * 60 + 59) * 1000000000L + 999999000,
"HH:mm:ss.SSSSSS") ->
+ "23:59:59.999999"
).foreach { case ((micros, pattern), expectedStr) =>
val formatter = TimeFormatter(format = pattern)
assert(formatter.format(micros) === expectedStr)
@@ -82,8 +86,8 @@ class TimeFormatterSuite extends SparkFunSuite with SQLHelper
{
assert(e.getMessage.contains(expectedMsg))
}
- assertError(-1, "Invalid value for NanoOfDay (valid values 0 -
86399999999999): -1000")
- assertError(25L * 3600 * 1000 * 1000,
+ assertError(-1000, "Invalid value for NanoOfDay (valid values 0 -
86399999999999): -1000")
+ assertError(25L * 3600 * 1000 * 1000 * 1000,
"Invalid value for NanoOfDay (valid values 0 - 86399999999999):
90000000000000")
}
@@ -101,14 +105,14 @@ class TimeFormatterSuite extends SparkFunSuite with
SQLHelper {
}
test("round trip with the default pattern: format -> parse") {
- val data = Seq.tabulate(10) { _ => Random.between(0, 24 * 60 * 60 *
1000000L) }
+ val data = Seq.tabulate(10) { _ => Random.between(0, 24 * 60 * 60 *
1000000L) * 1000L }
val pattern = "HH:mm:ss.SSSSSS"
val (formatter, parser) =
(TimeFormatter(pattern, isParsing = false), TimeFormatter(pattern,
isParsing = true))
- data.foreach { micros =>
- val str = formatter.format(micros)
- assert(parser.parse(str) === micros, s"micros = $micros")
- assert(formatter.format(microsToLocalTime(micros)) === str)
+ data.foreach { nanos =>
+ val str = formatter.format(nanos)
+ assert(parser.parse(str) === nanos, s"nanos = $nanos")
+ assert(formatter.format(nanosToLocalTime(nanos)) === str)
}
}
@@ -120,8 +124,9 @@ class TimeFormatterSuite extends SparkFunSuite with
SQLHelper {
1000 -> "00:00:00.001",
900000 -> "00:00:00.9",
1000000 -> "00:00:01").foreach { case (micros, tsStr) =>
- assert(formatter.format(micros) === tsStr)
- assert(formatter.format(microsToLocalTime(micros)) === tsStr)
+ val nanos = micros * 1000L
+ assert(formatter.format(nanos) === tsStr)
+ assert(formatter.format(nanosToLocalTime(nanos)) === tsStr)
}
}
diff --git
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java
index 889f11e11973..eb6c84b8113b 100644
---
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java
+++
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java
@@ -24,6 +24,7 @@ import org.apache.parquet.schema.LogicalTypeAnnotation;
import
org.apache.parquet.schema.LogicalTypeAnnotation.IntLogicalTypeAnnotation;
import
org.apache.parquet.schema.LogicalTypeAnnotation.DateLogicalTypeAnnotation;
import
org.apache.parquet.schema.LogicalTypeAnnotation.DecimalLogicalTypeAnnotation;
+import
org.apache.parquet.schema.LogicalTypeAnnotation.TimeLogicalTypeAnnotation;
import
org.apache.parquet.schema.LogicalTypeAnnotation.TimestampLogicalTypeAnnotation;
import org.apache.parquet.schema.PrimitiveType;
@@ -159,7 +160,7 @@ public class ParquetVectorUpdaterFactory {
} else if (canReadAsDecimal(descriptor, sparkType)) {
return new LongToDecimalUpdater(descriptor, (DecimalType) sparkType);
} else if (sparkType instanceof TimeType) {
- return new LongUpdater();
+ return new LongAsNanosUpdater();
}
}
case FLOAT -> {
@@ -233,6 +234,11 @@ public class ParquetVectorUpdaterFactory {
annotation.getUnit() == unit;
}
+ boolean isTimeTypeMatched(LogicalTypeAnnotation.TimeUnit unit) {
+ return logicalTypeAnnotation instanceof TimeLogicalTypeAnnotation
annotation &&
+ annotation.getUnit() == unit;
+ }
+
boolean isUnsignedIntTypeMatched(int bitWidth) {
return logicalTypeAnnotation instanceof IntLogicalTypeAnnotation
annotation &&
!annotation.isSigned() && annotation.getBitWidth() == bitWidth;
@@ -825,6 +831,42 @@ public class ParquetVectorUpdaterFactory {
}
}
+ private static class LongAsNanosUpdater implements ParquetVectorUpdater {
+ @Override
+ public void readValues(
+ int total,
+ int offset,
+ WritableColumnVector values,
+ VectorizedValuesReader valuesReader) {
+ for (int i = 0; i < total; ++i) {
+ readValue(offset + i, values, valuesReader);
+ }
+ }
+
+ @Override
+ public void skipValues(int total, VectorizedValuesReader valuesReader) {
+ valuesReader.skipLongs(total);
+ }
+
+ @Override
+ public void readValue(
+ int offset,
+ WritableColumnVector values,
+ VectorizedValuesReader valuesReader) {
+ values.putLong(offset,
DateTimeUtils.microsToNanos(valuesReader.readLong()));
+ }
+
+ @Override
+ public void decodeSingleDictionaryId(
+ int offset,
+ WritableColumnVector values,
+ WritableColumnVector dictionaryIds,
+ Dictionary dictionary) {
+ long micros = dictionary.decodeToLong(dictionaryIds.getDictId(offset));
+ values.putLong(offset, DateTimeUtils.microsToNanos(micros));
+ }
+ }
+
private static class FloatUpdater implements ParquetVectorUpdater {
@Override
public void readValues(
diff --git
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
index 731c78cf9450..6e1660dc8c87 100644
---
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
+++
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
@@ -165,7 +165,8 @@ public class VectorizedColumnReader {
case INT64: {
boolean isDecimal = sparkType instanceof DecimalType;
boolean needsUpcast = (isDecimal &&
!DecimalType.is64BitDecimalType(sparkType)) ||
- updaterFactory.isTimestampTypeMatched(TimeUnit.MILLIS);
+ updaterFactory.isTimestampTypeMatched(TimeUnit.MILLIS) ||
+ updaterFactory.isTimeTypeMatched(TimeUnit.MICROS);
boolean needsRebase =
updaterFactory.isTimestampTypeMatched(TimeUnit.MICROS) &&
!"CORRECTED".equals(datetimeRebaseMode);
isSupported = !needsUpcast && !needsRebase &&
!needsDecimalScaleRebase(sparkType);
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
index 565742671b9c..4a9b17bf98e5 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
@@ -22,6 +22,7 @@ import java.math.{BigDecimal => JBigDecimal}
import java.nio.charset.StandardCharsets.UTF_8
import java.sql.{Date, Timestamp}
import java.time.{Duration, Instant, LocalDate, LocalTime, Period}
+import java.time.temporal.ChronoField.MICRO_OF_DAY
import java.util.HashSet
import java.util.Locale
@@ -149,7 +150,7 @@ class ParquetFilters(
ParquetSchemaType(LogicalTypeAnnotation.timestampType(true,
TimeUnit.MICROS), INT64, 0)
private val ParquetTimestampMillisType =
ParquetSchemaType(LogicalTypeAnnotation.timestampType(true,
TimeUnit.MILLIS), INT64, 0)
- private val ParquetTimeType =
+ private val ParquetTimeMicrosType =
ParquetSchemaType(LogicalTypeAnnotation.timeType(false, TimeUnit.MICROS),
INT64, 0)
private def dateToDays(date: Any): Int = {
@@ -176,7 +177,7 @@ class ParquetFilters(
}
private def localTimeToMicros(v: Any): JLong = {
- DateTimeUtils.localTimeToMicros(v.asInstanceOf[LocalTime])
+ v.asInstanceOf[LocalTime].getLong(MICRO_OF_DAY)
}
private def decimalToInt32(decimal: JBigDecimal): Integer =
decimal.unscaledValue().intValue()
@@ -213,7 +214,7 @@ class ParquetFilters(
private def toLongValue(v: Any): JLong = v match {
case d: Duration => IntervalUtils.durationToMicros(d)
- case lt: LocalTime => DateTimeUtils.localTimeToMicros(lt)
+ case lt: LocalTime => localTimeToMicros(lt)
case l => l.asInstanceOf[JLong]
}
@@ -251,7 +252,7 @@ class ParquetFilters(
(n: Array[String], v: Any) => FilterApi.eq(
longColumn(n),
Option(v).map(timestampToMillis).orNull)
- case ParquetTimeType =>
+ case ParquetTimeMicrosType =>
(n: Array[String], v: Any) => FilterApi.eq(
longColumn(n),
Option(v).map(localTimeToMicros).orNull)
@@ -304,7 +305,7 @@ class ParquetFilters(
(n: Array[String], v: Any) => FilterApi.notEq(
longColumn(n),
Option(v).map(timestampToMillis).orNull)
- case ParquetTimeType =>
+ case ParquetTimeMicrosType =>
(n: Array[String], v: Any) => FilterApi.notEq(
longColumn(n),
Option(v).map(localTimeToMicros).orNull)
@@ -348,7 +349,7 @@ class ParquetFilters(
(n: Array[String], v: Any) => FilterApi.lt(longColumn(n),
timestampToMicros(v))
case ParquetTimestampMillisType if pushDownTimestamp =>
(n: Array[String], v: Any) => FilterApi.lt(longColumn(n),
timestampToMillis(v))
- case ParquetTimeType =>
+ case ParquetTimeMicrosType =>
(n: Array[String], v: Any) => FilterApi.lt(longColumn(n),
localTimeToMicros(v))
case ParquetSchemaType(_: DecimalLogicalTypeAnnotation, INT32, _) if
pushDownDecimal =>
@@ -387,7 +388,7 @@ class ParquetFilters(
(n: Array[String], v: Any) => FilterApi.ltEq(longColumn(n),
timestampToMicros(v))
case ParquetTimestampMillisType if pushDownTimestamp =>
(n: Array[String], v: Any) => FilterApi.ltEq(longColumn(n),
timestampToMillis(v))
- case ParquetTimeType =>
+ case ParquetTimeMicrosType =>
(n: Array[String], v: Any) => FilterApi.ltEq(longColumn(n),
localTimeToMicros(v))
case ParquetSchemaType(_: DecimalLogicalTypeAnnotation, INT32, _) if
pushDownDecimal =>
@@ -426,7 +427,7 @@ class ParquetFilters(
(n: Array[String], v: Any) => FilterApi.gt(longColumn(n),
timestampToMicros(v))
case ParquetTimestampMillisType if pushDownTimestamp =>
(n: Array[String], v: Any) => FilterApi.gt(longColumn(n),
timestampToMillis(v))
- case ParquetTimeType =>
+ case ParquetTimeMicrosType =>
(n: Array[String], v: Any) => FilterApi.gt(longColumn(n),
localTimeToMicros(v))
case ParquetSchemaType(_: DecimalLogicalTypeAnnotation, INT32, _) if
pushDownDecimal =>
@@ -465,7 +466,7 @@ class ParquetFilters(
(n: Array[String], v: Any) => FilterApi.gtEq(longColumn(n),
timestampToMicros(v))
case ParquetTimestampMillisType if pushDownTimestamp =>
(n: Array[String], v: Any) => FilterApi.gtEq(longColumn(n),
timestampToMillis(v))
- case ParquetTimeType =>
+ case ParquetTimeMicrosType =>
(n: Array[String], v: Any) => FilterApi.gtEq(longColumn(n),
localTimeToMicros(v))
case ParquetSchemaType(_: DecimalLogicalTypeAnnotation, INT32, _) if
pushDownDecimal =>
@@ -556,7 +557,7 @@ class ParquetFilters(
}
FilterApi.in(longColumn(n), set)
- case ParquetTimeType =>
+ case ParquetTimeMicrosType =>
(n: Array[String], values: Array[Any]) =>
val set = new HashSet[JLong]()
for (value <- values) {
@@ -661,7 +662,7 @@ class ParquetFilters(
value.isInstanceOf[Date] || value.isInstanceOf[LocalDate]
case ParquetTimestampMicrosType | ParquetTimestampMillisType =>
value.isInstanceOf[Timestamp] || value.isInstanceOf[Instant]
- case ParquetTimeType => value.isInstanceOf[LocalTime]
+ case ParquetTimeMicrosType => value.isInstanceOf[LocalTime]
case ParquetSchemaType(decimalType: DecimalLogicalTypeAnnotation, INT32,
_) =>
isDecimalMatched(value, decimalType)
case ParquetSchemaType(decimalType: DecimalLogicalTypeAnnotation, INT64,
_) =>
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
index 0927f5c3c963..cb5e7bf53215 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
@@ -487,7 +487,8 @@ private[parquet] class ParquetRowConverter(
.asInstanceOf[TimeLogicalTypeAnnotation].getUnit ==
TimeUnit.MICROS =>
new ParquetPrimitiveConverter(updater) {
override def addLong(value: Long): Unit = {
- this.updater.setLong(value)
+ val nanos = DateTimeUtils.microsToNanos(value)
+ this.updater.setLong(nanos)
}
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala
index 4022f7ea3003..9f1a815d7fa1 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala
@@ -209,7 +209,7 @@ class ParquetWriteSupport extends WriteSupport[InternalRow]
with Logging {
(row: SpecializedGetters, ordinal: Int) =>
recordConsumer.addInteger(row.getInt(ordinal))
- case LongType | _: DayTimeIntervalType | _: TimeType =>
+ case LongType | _: DayTimeIntervalType =>
(row: SpecializedGetters, ordinal: Int) =>
recordConsumer.addLong(row.getLong(ordinal))
@@ -253,6 +253,10 @@ class ParquetWriteSupport extends
WriteSupport[InternalRow] with Logging {
// MICROS time unit.
(row: SpecializedGetters, ordinal: Int) =>
recordConsumer.addLong(row.getLong(ordinal))
+ case _: TimeType =>
+ (row: SpecializedGetters, ordinal: Int) =>
+
recordConsumer.addLong(DateTimeUtils.nanosToMicros(row.getLong(ordinal)))
+
case BinaryType =>
(row: SpecializedGetters, ordinal: Int) =>
recordConsumer.addBinary(Binary.fromReusedByteArray(row.getBinary(ordinal)))
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
index d78b4a426e70..f52b0bdd8790 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
@@ -41,8 +41,8 @@ import org.apache.spark.{SPARK_VERSION_SHORT, SparkException,
TestUtils}
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow,
UnsafeRow}
+import org.apache.spark.sql.catalyst.util.{DateTimeConstants, DateTimeUtils}
import org.apache.spark.sql.catalyst.util.DateTimeTestUtils.localTime
-import org.apache.spark.sql.catalyst.util.DateTimeUtils
import
org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol
import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf
@@ -1623,7 +1623,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest
with SharedSparkSession
val writer = createParquetWriter(schema, tablePath, dictionaryEnabled
= dictEnabled)
(0 until numRecords).foreach { i =>
val record = new SimpleGroup(schema)
- record.add(0, localTime(23, 59, 59, 123456))
+ record.add(0, localTime(23, 59, 59, 123456) /
DateTimeConstants.NANOS_PER_MICROS)
writer.write(record)
}
writer.close
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]