This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 028091f  [SPARK-31183][SQL] Rebase date/timestamp from/to Julian 
calendar in Avro
028091f is described below

commit 028091fea4bfb3bc1a8ae114fd01be2c9bfcb82d
Author: Maxim Gekk <[email protected]>
AuthorDate: Fri Mar 20 13:57:49 2020 +0800

    [SPARK-31183][SQL] Rebase date/timestamp from/to Julian calendar in Avro
    
    The PR addresses the issue of compatibility with Spark 2.4 and earlier 
version in reading/writing dates and timestamp via **Avro** datasource. 
Previous releases are based on a hybrid calendar - Julian + Gregorian. Since 
Spark 3.0, Proleptic Gregorian calendar is used by default, see SPARK-26651. In 
particular, the issue pops up for dates/timestamps before 1582-10-15 when the 
hybrid calendar switches from/to Gregorian to/from Julian calendar. The same 
local date in different calendar i [...]
    - -719164 in Julian calendar. Spark 2.4 saves the number as a value of DATE 
type into **Avro** files.
    - -719162 in Proleptic Gregorian calendar. Spark 3.0 saves the number as a 
date value.
    
    The PR proposes rebasing from/to Proleptic Gregorian calendar to the hybrid 
one under the SQL config:
    ```
    spark.sql.legacy.avro.rebaseDateTime.enabled
    ```
    which is set to `false` by default which means the rebasing is not 
performed by default.
    
    The details of the implementation:
    1. Re-use 2 methods of `DateTimeUtils` added by the PR 
https://github.com/apache/spark/pull/27915 for rebasing microseconds.
    2. Re-use 2 methods of `DateTimeUtils` added by the PR 
https://github.com/apache/spark/pull/27915 for rebasing days.
    3. Use `rebaseGregorianToJulianMicros()` and 
`rebaseGregorianToJulianDays()` while saving timestamps/dates to **Avro** files 
if the SQL config is on.
    4. Use `rebaseJulianToGregorianMicros()` and 
`rebaseJulianToGregorianDays()` while loading timestamps/dates from **Avro** 
files if the SQL config is on.
    5. The SQL config `spark.sql.legacy.avro.rebaseDateTime.enabled` controls 
conversions from/to dates, and timestamps of the `timestamp-millis`, 
`timestamp-micros` logical types.
    
    For the backward compatibility with Spark 2.4 and earlier versions. The 
changes allow users to read dates/timestamps saved by previous version, and get 
the same result. Also after the changes, users can enable the rebasing in 
write, and save dates/timestamps that can be loaded correctly by Spark 2.4 and 
earlier versions.
    
    Yes, the timestamp `1001-01-01 01:02:03.123456` saved by Spark 2.4.5 as 
`timestamp-micros` is interpreted by Spark 3.0.0-preview2 differently:
    ```scala
    scala> spark.conf.set("spark.sql.session.timeZone", "America/Los_Angeles")
    scala> 
spark.read.format("avro").load("/Users/maxim/tmp/before_1582/2_4_5_date_avro").show(false)
    +----------+
    |date      |
    +----------+
    |1001-01-07|
    +----------+
    ```
    After the changes:
    ```scala
    scala> spark.conf.set("spark.sql.legacy.avro.rebaseDateTime.enabled", true)
    scala> spark.conf.set("spark.sql.session.timeZone", "America/Los_Angeles")
    
    scala> 
spark.read.format("avro").load("/Users/maxim/tmp/before_1582/2_4_5_date_avro").show(false)
    +----------+
    |date      |
    +----------+
    |1001-01-01|
    +----------+
    ```
    
    1. Added tests to `AvroLogicalTypeSuite` to check rebasing in read. The 
test reads back avro files saved by Spark 2.4.5 via:
    ```shell
    $ export TZ="America/Los_Angeles"
    ```
    ```scala
    scala> spark.conf.set("spark.sql.session.timeZone", "America/Los_Angeles")
    scala> val df = 
Seq("1001-01-01").toDF("dateS").select($"dateS".cast("date").as("date"))
    df: org.apache.spark.sql.DataFrame = [date: date]
    scala> 
df.write.format("avro").save("/Users/maxim/tmp/before_1582/2_4_5_date_avro")
    
    scala> val df2 = Seq("1001-01-01 
01:02:03.123456").toDF("tsS").select($"tsS".cast("timestamp").as("ts"))
    df2: org.apache.spark.sql.DataFrame = [ts: timestamp]
    scala> 
df2.write.format("avro").save("/Users/maxim/tmp/before_1582/2_4_5_ts_avro")
    
    scala> :paste
    // Entering paste mode (ctrl-D to finish)
    
      val timestampSchema = s"""
        |  {
        |    "namespace": "logical",
        |    "type": "record",
        |    "name": "test",
        |    "fields": [
        |      {"name": "ts", "type": ["null", {"type": "long","logicalType": 
"timestamp-millis"}], "default": null}
        |    ]
        |  }
        |""".stripMargin
    
    // Exiting paste mode, now interpreting.
    scala> df3.write.format("avro").option("avroSchema", 
timestampSchema).save("/Users/maxim/tmp/before_1582/2_4_5_ts_millis_avro")
    
    ```
    
    2. Added the following tests to `AvroLogicalTypeSuite` to check rebasing of 
dates/timestamps (in microsecond and millisecond precision). The tests write 
rebased a date/timestamps and read them back w/ enabled/disabled rebasing, and 
compare results. :
      - `rebasing microseconds timestamps in write`
      - `rebasing milliseconds timestamps in write`
      - `rebasing dates in write`
    
    Closes #27953 from MaxGekk/rebase-avro-datetime.
    
    Authored-by: Maxim Gekk <[email protected]>
    Signed-off-by: Wenchen Fan <[email protected]>
    (cherry picked from commit 4766a3664729644b5391b13805cdee44501025d8)
    Signed-off-by: Wenchen Fan <[email protected]>
---
 .../apache/spark/sql/avro/AvroDeserializer.scala   |  34 +++++--
 .../org/apache/spark/sql/avro/AvroSerializer.scala |  26 ++++--
 .../src/test/resources/before_1582_date_v2_4.avro  | Bin 0 -> 202 bytes
 .../test/resources/before_1582_ts_micros_v2_4.avro | Bin 0 -> 218 bytes
 .../test/resources/before_1582_ts_millis_v2_4.avro | Bin 0 -> 244 bytes
 .../spark/sql/avro/AvroLogicalTypeSuite.scala      |  98 ++++++++++++++++++++-
 .../org/apache/spark/sql/internal/SQLConf.scala    |  15 ++++
 7 files changed, 158 insertions(+), 15 deletions(-)

diff --git 
a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala 
b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
index 2c17c16..b98f303 100644
--- 
a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
+++ 
b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
@@ -32,8 +32,9 @@ import org.apache.avro.util.Utf8
 
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.{SpecificInternalRow, 
UnsafeArrayData}
-import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, ArrayData, 
GenericArrayData}
+import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, ArrayData, 
DateTimeUtils, GenericArrayData}
 import org.apache.spark.sql.catalyst.util.DateTimeConstants.MILLIS_PER_DAY
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.UTF8String
 /**
@@ -42,6 +43,9 @@ import org.apache.spark.unsafe.types.UTF8String
 class AvroDeserializer(rootAvroType: Schema, rootCatalystType: DataType) {
   private lazy val decimalConversions = new DecimalConversion()
 
+  // Enable rebasing date/timestamp from Julian to Proleptic Gregorian calendar
+  private val rebaseDateTime = SQLConf.get.avroRebaseDateTimeEnabled
+
   private val converter: Any => Any = rootCatalystType match {
     // A shortcut for empty schema.
     case st: StructType if st.isEmpty =>
@@ -88,6 +92,11 @@ class AvroDeserializer(rootAvroType: Schema, 
rootCatalystType: DataType) {
       case (INT, IntegerType) => (updater, ordinal, value) =>
         updater.setInt(ordinal, value.asInstanceOf[Int])
 
+      case (INT, DateType) if rebaseDateTime => (updater, ordinal, value) =>
+        val days = value.asInstanceOf[Int]
+        val rebasedDays = DateTimeUtils.rebaseJulianToGregorianDays(days)
+        updater.setInt(ordinal, rebasedDays)
+
       case (INT, DateType) => (updater, ordinal, value) =>
         updater.setInt(ordinal, value.asInstanceOf[Int])
 
@@ -95,14 +104,23 @@ class AvroDeserializer(rootAvroType: Schema, 
rootCatalystType: DataType) {
         updater.setLong(ordinal, value.asInstanceOf[Long])
 
       case (LONG, TimestampType) => avroType.getLogicalType match {
-        case _: TimestampMillis => (updater, ordinal, value) =>
-          updater.setLong(ordinal, value.asInstanceOf[Long] * 1000)
+        // For backward compatibility, if the Avro type is Long and it is not 
logical type
+        // (the `null` case), the value is processed as timestamp type with 
millisecond precision.
+        case null | _: TimestampMillis => (updater, ordinal, value) =>
+          val millis = value.asInstanceOf[Long]
+          val micros = DateTimeUtils.fromMillis(millis)
+          if (rebaseDateTime) {
+            updater.setLong(ordinal, 
DateTimeUtils.rebaseJulianToGregorianMicros(micros))
+          } else {
+            updater.setLong(ordinal, micros)
+          }
         case _: TimestampMicros => (updater, ordinal, value) =>
-          updater.setLong(ordinal, value.asInstanceOf[Long])
-        case null => (updater, ordinal, value) =>
-          // For backward compatibility, if the Avro type is Long and it is 
not logical type,
-          // the value is processed as timestamp type with millisecond 
precision.
-          updater.setLong(ordinal, value.asInstanceOf[Long] * 1000)
+          val micros = value.asInstanceOf[Long]
+          if (rebaseDateTime) {
+            updater.setLong(ordinal, 
DateTimeUtils.rebaseJulianToGregorianMicros(micros))
+          } else {
+            updater.setLong(ordinal, micros)
+          }
         case other => throw new IncompatibleSchemaException(
           s"Cannot convert Avro logical type ${other} to Catalyst Timestamp 
type.")
       }
diff --git 
a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala 
b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala
index b7bf7e5..af9e3a5 100644
--- 
a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala
+++ 
b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala
@@ -34,6 +34,8 @@ import org.apache.avro.util.Utf8
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.{SpecializedGetters, 
SpecificInternalRow}
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
 
 /**
@@ -42,6 +44,9 @@ import org.apache.spark.sql.types._
 class AvroSerializer(rootCatalystType: DataType, rootAvroType: Schema, 
nullable: Boolean)
   extends Logging {
 
+  // Whether to rebase datetimes from Gregorian to Julian calendar in write
+  private val rebaseDateTime: Boolean = SQLConf.get.avroRebaseDateTimeEnabled
+
   def serialize(catalystData: Any): Any = {
     converter.apply(catalystData)
   }
@@ -135,15 +140,26 @@ class AvroSerializer(rootCatalystType: DataType, 
rootAvroType: Schema, nullable:
       case (BinaryType, BYTES) =>
         (getter, ordinal) => ByteBuffer.wrap(getter.getBinary(ordinal))
 
+      case (DateType, INT) if rebaseDateTime =>
+        (getter, ordinal) => 
DateTimeUtils.rebaseGregorianToJulianDays(getter.getInt(ordinal))
+
       case (DateType, INT) =>
         (getter, ordinal) => getter.getInt(ordinal)
 
       case (TimestampType, LONG) => avroType.getLogicalType match {
-          case _: TimestampMillis => (getter, ordinal) => 
getter.getLong(ordinal) / 1000
-          case _: TimestampMicros => (getter, ordinal) => 
getter.getLong(ordinal)
-          // For backward compatibility, if the Avro type is Long and it is 
not logical type,
-          // output the timestamp value as with millisecond precision.
-          case null => (getter, ordinal) => getter.getLong(ordinal) / 1000
+          // For backward compatibility, if the Avro type is Long and it is 
not logical type
+          // (the `null` case), output the timestamp value as with millisecond 
precision.
+          case null | _: TimestampMillis => (getter, ordinal) =>
+            val micros = getter.getLong(ordinal)
+            val rebasedMicros = if (rebaseDateTime) {
+              DateTimeUtils.rebaseGregorianToJulianMicros(micros)
+            } else micros
+            DateTimeUtils.toMillis(rebasedMicros)
+          case _: TimestampMicros => (getter, ordinal) =>
+            val micros = getter.getLong(ordinal)
+            if (rebaseDateTime) {
+              DateTimeUtils.rebaseGregorianToJulianMicros(micros)
+            } else micros
           case other => throw new IncompatibleSchemaException(
             s"Cannot convert Catalyst Timestamp type to Avro logical type 
${other}")
         }
diff --git a/external/avro/src/test/resources/before_1582_date_v2_4.avro 
b/external/avro/src/test/resources/before_1582_date_v2_4.avro
new file mode 100644
index 0000000..96aa7cb
Binary files /dev/null and 
b/external/avro/src/test/resources/before_1582_date_v2_4.avro differ
diff --git a/external/avro/src/test/resources/before_1582_ts_micros_v2_4.avro 
b/external/avro/src/test/resources/before_1582_ts_micros_v2_4.avro
new file mode 100644
index 0000000..efe5e71
Binary files /dev/null and 
b/external/avro/src/test/resources/before_1582_ts_micros_v2_4.avro differ
diff --git a/external/avro/src/test/resources/before_1582_ts_millis_v2_4.avro 
b/external/avro/src/test/resources/before_1582_ts_millis_v2_4.avro
new file mode 100644
index 0000000..dbaec81
Binary files /dev/null and 
b/external/avro/src/test/resources/before_1582_ts_millis_v2_4.avro differ
diff --git 
a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroLogicalTypeSuite.scala
 
b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroLogicalTypeSuite.scala
index 8256965..9e89b69 100644
--- 
a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroLogicalTypeSuite.scala
+++ 
b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroLogicalTypeSuite.scala
@@ -17,7 +17,7 @@
 package org.apache.spark.sql.avro
 
 import java.io.File
-import java.sql.Timestamp
+import java.sql.{Date, Timestamp}
 
 import org.apache.avro.{LogicalTypes, Schema}
 import org.apache.avro.Conversions.DecimalConversion
@@ -25,7 +25,7 @@ import org.apache.avro.file.DataFileWriter
 import org.apache.avro.generic.{GenericData, GenericDatumWriter, GenericRecord}
 
 import org.apache.spark.{SparkConf, SparkException}
-import org.apache.spark.sql.{QueryTest, Row}
+import org.apache.spark.sql.{DataFrame, QueryTest, Row}
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SharedSparkSession
@@ -348,6 +348,100 @@ abstract class AvroLogicalTypeSuite extends QueryTest 
with SharedSparkSession {
       assert(msg.contains("Unscaled value too large for precision"))
     }
   }
+
+  private def readResourceAvroFile(name: String): DataFrame = {
+    val url = Thread.currentThread().getContextClassLoader.getResource(name)
+    spark.read.format("avro").load(url.toString)
+  }
+
+  test("SPARK-31183: compatibility with Spark 2.4 in reading 
dates/timestamps") {
+    withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME.key -> "true") {
+      checkAnswer(
+        readResourceAvroFile("before_1582_date_v2_4.avro"),
+        Row(java.sql.Date.valueOf("1001-01-01")))
+      checkAnswer(
+        readResourceAvroFile("before_1582_ts_micros_v2_4.avro"),
+        Row(java.sql.Timestamp.valueOf("1001-01-01 01:02:03.123456")))
+      checkAnswer(
+        readResourceAvroFile("before_1582_ts_millis_v2_4.avro"),
+        Row(java.sql.Timestamp.valueOf("1001-01-01 01:02:03.124")))
+    }
+  }
+
+  test("SPARK-31183: rebasing microseconds timestamps in write") {
+    val tsStr = "1001-01-01 01:02:03.123456"
+    val nonRebased = "1001-01-07 01:09:05.123456"
+    withTempPath { dir =>
+      val path = dir.getAbsolutePath
+      withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME.key -> "true") {
+        Seq(tsStr).toDF("tsS")
+          .select($"tsS".cast("timestamp").as("ts"))
+          .write.format("avro")
+          .save(path)
+
+        checkAnswer(spark.read.format("avro").load(path), 
Row(Timestamp.valueOf(tsStr)))
+      }
+      withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME.key -> "false") {
+        checkAnswer(spark.read.format("avro").load(path), 
Row(Timestamp.valueOf(nonRebased)))
+      }
+    }
+  }
+
+  test("SPARK-31183: rebasing milliseconds timestamps in write") {
+    val tsStr = "1001-01-01 01:02:03.123456"
+    val rebased = "1001-01-01 01:02:03.123"
+    val nonRebased = "1001-01-07 01:09:05.123"
+    Seq(
+      """{"type": "long","logicalType": "timestamp-millis"}""",
+      """"long"""").foreach { tsType =>
+      val timestampSchema = s"""
+          |{
+          |  "namespace": "logical",
+          |  "type": "record",
+          |  "name": "test",
+          |  "fields": [
+          |    {"name": "ts", "type": $tsType}
+          |  ]
+          |}""".stripMargin
+      withTempPath { dir =>
+        val path = dir.getAbsolutePath
+        withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME.key -> "true") {
+          Seq(tsStr).toDF("tsS")
+            .select($"tsS".cast("timestamp").as("ts"))
+            .write
+            .option("avroSchema", timestampSchema)
+            .format("avro")
+            .save(path)
+
+          checkAnswer(
+            spark.read.schema("ts timestamp").format("avro").load(path),
+            Row(Timestamp.valueOf(rebased)))
+        }
+        withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME.key -> "false") {
+          checkAnswer(
+            spark.read.schema("ts timestamp").format("avro").load(path),
+            Row(Timestamp.valueOf(nonRebased)))
+        }
+      }
+    }
+  }
+
+  test("SPARK-31183: rebasing dates in write") {
+    withTempPath { dir =>
+      val path = dir.getAbsolutePath
+      withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME.key -> "true") {
+        Seq("1001-01-01").toDF("dateS")
+          .select($"dateS".cast("date").as("date"))
+          .write.format("avro")
+          .save(path)
+
+        checkAnswer(spark.read.format("avro").load(path), 
Row(Date.valueOf("1001-01-01")))
+      }
+      withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME.key -> "false") {
+        checkAnswer(spark.read.format("avro").load(path), 
Row(Date.valueOf("1001-01-07")))
+      }
+    }
+  }
 }
 
 class AvroV1LogicalTypeSuite extends AvroLogicalTypeSuite {
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index a47f7d9..9681a5d 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -2245,6 +2245,19 @@ object SQLConf {
       .booleanConf
       .createWithDefault(false)
 
+  val LEGACY_AVRO_REBASE_DATETIME =
+    buildConf("spark.sql.legacy.avro.rebaseDateTime.enabled")
+      .internal()
+      .doc("When true, rebase dates/timestamps from Proleptic Gregorian 
calendar " +
+        "to the hybrid calendar (Julian + Gregorian) in write and " +
+        "from the hybrid calendar to Proleptic Gregorian calendar in read. " +
+        "The rebasing is performed by converting micros/millis/days to " +
+        "a local date/timestamp in the source calendar, interpreting the 
resulted date/" +
+        "timestamp in the target calendar, and getting the number of 
micros/millis/days " +
+        "since the epoch 1970-01-01 00:00:00Z.")
+      .booleanConf
+      .createWithDefault(false)
+
   /**
    * Holds information about keys that have been deprecated.
    *
@@ -2822,6 +2835,8 @@ class SQLConf extends Serializable with Logging {
 
   def parquetRebaseDateTimeEnabled: Boolean = 
getConf(SQLConf.LEGACY_PARQUET_REBASE_DATETIME)
 
+  def avroRebaseDateTimeEnabled: Boolean = 
getConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME)
+
   /** ********************** SQLConf functionality methods ************ */
 
   /** Set Spark SQL configuration properties. */


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to