This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 3d3e366 [SPARK-31076][SQL] Convert Catalyst's DATE/TIMESTAMP to Java
Date/Timestamp via local date-time
3d3e366 is described below
commit 3d3e366aa836cb7d2295f54e78e544c7b15c9c08
Author: Maxim Gekk <[email protected]>
AuthorDate: Wed Mar 11 20:53:56 2020 +0800
[SPARK-31076][SQL] Convert Catalyst's DATE/TIMESTAMP to Java Date/Timestamp
via local date-time
### What changes were proposed in this pull request?
In the PR, I propose to change conversion of java.sql.Timestamp/Date values
to/from internal values of Catalyst's TimestampType/DateType before cutover day
`1582-10-15` of Gregorian calendar. I propose to construct local date-time from
microseconds/days since the epoch. Take each date-time component `year`,
`month`, `day`, `hour`, `minute`, `second` and `second fraction`, and construct
java.sql.Timestamp/Date using the extracted components.
### Why are the changes needed?
This will rebase underlying time/date offset in the way that collected
java.sql.Timestamp/Date values will have the same local time-date component as
the original values in Gregorian calendar.
Here is the example which demonstrates the issue:
```sql
scala> sql("select date '1100-10-10'").collect()
res1: Array[org.apache.spark.sql.Row] = Array([1100-10-03])
```
### Does this PR introduce any user-facing change?
Yes, after the changes:
```sql
scala> sql("select date '1100-10-10'").collect()
res0: Array[org.apache.spark.sql.Row] = Array([1100-10-10])
```
### How was this patch tested?
By running `DateTimeUtilsSuite`, `DateFunctionsSuite` and
`DateExpressionsSuite`.
Closes #27807 from MaxGekk/rebase-timestamp-before-1582.
Authored-by: Maxim Gekk <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
---
.../spark/sql/catalyst/util/DateTimeUtils.scala | 39 +++++++++++--
.../spark/sql/execution/HiveResultSuite.scala | 6 +-
.../execution/datasources/orc/OrcColumnVector.java | 3 +-
.../execution/datasources/orc/OrcColumnVector.java | 3 +-
.../org/apache/spark/sql/hive/HiveInspectors.scala | 66 ++++++++++++++++++++--
5 files changed, 102 insertions(+), 15 deletions(-)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
index de4c24e..9f207ec 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
@@ -47,6 +47,15 @@ object DateTimeUtils {
// it's 2440587.5, rounding up to compatible with Hive
final val JULIAN_DAY_OF_EPOCH = 2440588
+ final val GREGORIAN_CUTOVER_DAY = LocalDate.of(1582, 10, 15).toEpochDay
+ final val GREGORIAN_CUTOVER_MICROS = instantToMicros(
+ LocalDateTime.of(1582, 10, 15, 0, 0, 0)
+ .atOffset(ZoneOffset.UTC)
+ .toInstant)
+ final val GREGORIAN_CUTOVER_MILLIS = microsToMillis(GREGORIAN_CUTOVER_MICROS)
+
+ final val julianCommonEraStart = Timestamp.valueOf("0001-01-01 00:00:00")
+
final val TimeZoneGMT = TimeZone.getTimeZone("GMT")
final val TimeZoneUTC = TimeZone.getTimeZone("UTC")
@@ -86,28 +95,50 @@ object DateTimeUtils {
* Returns the number of days since epoch from java.sql.Date.
*/
def fromJavaDate(date: Date): SQLDate = {
- microsToDays(millisToMicros(date.getTime))
+ if (date.getTime < GREGORIAN_CUTOVER_MILLIS) {
+ val era = if (date.before(julianCommonEraStart)) 0 else 1
+ val localDate = date.toLocalDate.`with`(ChronoField.ERA, era)
+ localDateToDays(localDate)
+ } else {
+ microsToDays(millisToMicros(date.getTime))
+ }
}
/**
* Returns a java.sql.Date from number of days since epoch.
*/
def toJavaDate(daysSinceEpoch: SQLDate): Date = {
- new Date(microsToMillis(daysToMicros(daysSinceEpoch)))
+ if (daysSinceEpoch < GREGORIAN_CUTOVER_DAY) {
+ Date.valueOf(LocalDate.ofEpochDay(daysSinceEpoch))
+ } else {
+ new Date(microsToMillis(daysToMicros(daysSinceEpoch)))
+ }
}
/**
* Returns a java.sql.Timestamp from number of micros since epoch.
*/
def toJavaTimestamp(us: SQLTimestamp): Timestamp = {
- Timestamp.from(microsToInstant(us))
+ if (us < GREGORIAN_CUTOVER_MICROS) {
+ val ldt =
microsToInstant(us).atZone(ZoneId.systemDefault()).toLocalDateTime
+ Timestamp.valueOf(ldt)
+ } else {
+ Timestamp.from(microsToInstant(us))
+ }
}
/**
* Returns the number of micros since epoch from java.sql.Timestamp.
*/
def fromJavaTimestamp(t: Timestamp): SQLTimestamp = {
- instantToMicros(t.toInstant)
+ if (t.getTime < GREGORIAN_CUTOVER_MILLIS) {
+ val era = if (t.before(julianCommonEraStart)) 0 else 1
+ val localDateTime = t.toLocalDateTime.`with`(ChronoField.ERA, era)
+ val instant = ZonedDateTime.of(localDateTime,
ZoneId.systemDefault()).toInstant
+ instantToMicros(instant)
+ } else {
+ instantToMicros(t.toInstant)
+ }
}
/**
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/HiveResultSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/HiveResultSuite.scala
index bb59b12..bf7cbaa 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/HiveResultSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/HiveResultSuite.scala
@@ -23,7 +23,7 @@ class HiveResultSuite extends SharedSparkSession {
import testImplicits._
test("date formatting in hive result") {
- val dates = Seq("2018-12-28", "1582-10-13", "1582-10-14", "1582-10-15")
+ val dates = Seq("2018-12-28", "1582-10-03", "1582-10-04", "1582-10-15")
val df = dates.toDF("a").selectExpr("cast(a as date) as b")
val executedPlan1 = df.queryExecution.executedPlan
val result = HiveResult.hiveResultString(executedPlan1)
@@ -36,8 +36,8 @@ class HiveResultSuite extends SharedSparkSession {
test("timestamp formatting in hive result") {
val timestamps = Seq(
"2018-12-28 01:02:03",
- "1582-10-13 01:02:03",
- "1582-10-14 01:02:03",
+ "1582-10-03 01:02:03",
+ "1582-10-04 01:02:03",
"1582-10-15 01:02:03")
val df = timestamps.toDF("a").selectExpr("cast(a as timestamp) as b")
val executedPlan1 = df.queryExecution.executedPlan
diff --git
a/sql/core/v1.2/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnVector.java
b/sql/core/v1.2/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnVector.java
index 9bfad1e..0dfed76 100644
---
a/sql/core/v1.2/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnVector.java
+++
b/sql/core/v1.2/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnVector.java
@@ -21,6 +21,7 @@ import java.math.BigDecimal;
import org.apache.orc.storage.ql.exec.vector.*;
+import org.apache.spark.sql.catalyst.util.DateTimeUtils;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.Decimal;
import org.apache.spark.sql.types.TimestampType;
@@ -136,7 +137,7 @@ public class OrcColumnVector extends
org.apache.spark.sql.vectorized.ColumnVecto
public long getLong(int rowId) {
int index = getRowIndex(rowId);
if (isTimestamp) {
- return timestampData.time[index] * 1000 + timestampData.nanos[index] /
1000 % 1000;
+ return
DateTimeUtils.fromJavaTimestamp(timestampData.asScratchTimestamp(index));
} else {
return longData.vector[index];
}
diff --git
a/sql/core/v2.3/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnVector.java
b/sql/core/v2.3/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnVector.java
index 2f1925e..35447fe 100644
---
a/sql/core/v2.3/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnVector.java
+++
b/sql/core/v2.3/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnVector.java
@@ -21,6 +21,7 @@ import java.math.BigDecimal;
import org.apache.hadoop.hive.ql.exec.vector.*;
+import org.apache.spark.sql.catalyst.util.DateTimeUtils;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.Decimal;
import org.apache.spark.sql.types.TimestampType;
@@ -136,7 +137,7 @@ public class OrcColumnVector extends
org.apache.spark.sql.vectorized.ColumnVecto
public long getLong(int rowId) {
int index = getRowIndex(rowId);
if (isTimestamp) {
- return timestampData.time[index] * 1000 + timestampData.nanos[index] /
1000 % 1000;
+ return
DateTimeUtils.fromJavaTimestamp(timestampData.asScratchTimestamp(index));
} else {
return longData.vector[index];
}
diff --git
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala
index 0cd9b36..e217c52 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala
@@ -18,7 +18,8 @@
package org.apache.spark.sql.hive
import java.lang.reflect.{ParameterizedType, Type, WildcardType}
-import java.util.concurrent.TimeUnit._
+import java.time.LocalDate
+import java.util.Calendar
import scala.collection.JavaConverters._
@@ -181,6 +182,33 @@ import org.apache.spark.unsafe.types.UTF8String
*/
private[hive] trait HiveInspectors {
+ private final val JULIAN_CUTOVER_DAY =
+ rebaseGregorianToJulianDays(DateTimeUtils.GREGORIAN_CUTOVER_DAY.toInt)
+
+ private def rebaseJulianToGregorianDays(daysSinceEpoch: Int): Int = {
+ val localDate = LocalDate.ofEpochDay(daysSinceEpoch)
+ val utcCal = new Calendar.Builder()
+ .setCalendarType("gregory")
+ .setTimeZone(DateTimeUtils.TimeZoneUTC)
+ .setDate(localDate.getYear, localDate.getMonthValue - 1,
localDate.getDayOfMonth)
+ .build()
+ Math.toIntExact(Math.floorDiv(utcCal.getTimeInMillis,
DateTimeConstants.MILLIS_PER_DAY))
+ }
+
+ private def rebaseGregorianToJulianDays(daysSinceEpoch: Int): Int = {
+ val millis = Math.multiplyExact(daysSinceEpoch,
DateTimeConstants.MILLIS_PER_DAY)
+ val utcCal = new Calendar.Builder()
+ .setCalendarType("gregory")
+ .setTimeZone(DateTimeUtils.TimeZoneUTC)
+ .setInstant(millis)
+ .build()
+ val localDate = LocalDate.of(
+ utcCal.get(Calendar.YEAR),
+ utcCal.get(Calendar.MONTH) + 1,
+ utcCal.get(Calendar.DAY_OF_MONTH))
+ Math.toIntExact(localDate.toEpochDay)
+ }
+
def javaTypeToDataType(clz: Type): DataType = clz match {
// writable
case c: Class[_] if c == classOf[hadoopIo.DoubleWritable] => DoubleType
@@ -466,7 +494,7 @@ private[hive] trait HiveInspectors {
_ => constant
case poi: WritableConstantTimestampObjectInspector =>
val t = poi.getWritableConstantValue
- val constant = SECONDS.toMicros(t.getSeconds) +
NANOSECONDS.toMicros(t.getNanos)
+ val constant = DateTimeUtils.fromJavaTimestamp(t.getTimestamp)
_ => constant
case poi: WritableConstantIntObjectInspector =>
val constant = poi.getWritableConstantValue.get()
@@ -618,7 +646,14 @@ private[hive] trait HiveInspectors {
case x: DateObjectInspector if x.preferWritable() =>
data: Any => {
if (data != null) {
-
DateTimeUtils.fromJavaDate(x.getPrimitiveWritableObject(data).get())
+ // Rebasing written days via conversion to local dates.
+ // See the comment for `getDateWritable()`.
+ val daysSinceEpoch = x.getPrimitiveWritableObject(data).getDays
+ if (daysSinceEpoch < JULIAN_CUTOVER_DAY) {
+ rebaseJulianToGregorianDays(daysSinceEpoch)
+ } else {
+ daysSinceEpoch
+ }
} else {
null
}
@@ -634,8 +669,7 @@ private[hive] trait HiveInspectors {
case x: TimestampObjectInspector if x.preferWritable() =>
data: Any => {
if (data != null) {
- val t = x.getPrimitiveWritableObject(data)
- SECONDS.toMicros(t.getSeconds) + NANOSECONDS.toMicros(t.getNanos)
+
DateTimeUtils.fromJavaTimestamp(x.getPrimitiveWritableObject(data).getTimestamp)
} else {
null
}
@@ -1012,7 +1046,27 @@ private[hive] trait HiveInspectors {
}
private def getDateWritable(value: Any): hiveIo.DateWritable =
- if (value == null) null else new
hiveIo.DateWritable(value.asInstanceOf[Int])
+ if (value == null) {
+ null
+ } else {
+ // Rebasing days since the epoch to store the same number of days
+ // as by Spark 2.4 and earlier versions. Spark 3.0 switched to
+ // Proleptic Gregorian calendar (see SPARK-26651), and as a consequence
of that,
+ // this affects dates before 1582-10-15. Spark 2.4 and earlier versions
use
+ // Julian calendar for dates before 1582-10-15. So, the same local date
may
+ // be mapped to different number of days since the epoch in different
calendars.
+ // For example:
+ // Proleptic Gregorian calendar: 1582-01-01 -> -141714
+ // Julian calendar: 1582-01-01 -> -141704
+ // The code below converts -141714 to -141704.
+ val daysSinceEpoch = value.asInstanceOf[Int]
+ val rebasedDays = if (daysSinceEpoch <
DateTimeUtils.GREGORIAN_CUTOVER_DAY) {
+ rebaseGregorianToJulianDays(daysSinceEpoch)
+ } else {
+ daysSinceEpoch
+ }
+ new hiveIo.DateWritable(rebasedDays)
+ }
private def getTimestampWritable(value: Any): hiveIo.TimestampWritable =
if (value == null) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]