This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 72a95bcad7f1 [SPARK-47324][SQL] Add missing timestamp conversion for
JDBC nested types
72a95bcad7f1 is described below
commit 72a95bcad7f1906c97fb0971ed6338374ec3009d
Author: Kent Yao <[email protected]>
AuthorDate: Mon Mar 11 09:34:12 2024 +0900
[SPARK-47324][SQL] Add missing timestamp conversion for JDBC nested types
### What changes were proposed in this pull request?
[SPARK-44280](https://issues.apache.org/jira/browse/SPARK-44280) added a
new API convertJavaTimestampToTimestamp which is called only for plain
timestamps.
This PR makes it work for timestamps in arrays
### Why are the changes needed?
data consistency/correctness
### Does this PR introduce _any_ user-facing change?
no
### How was this patch tested?
new tests
### Was this patch authored or co-authored using generative AI tooling?
no
Closes #45435 from yaooqinn/SPARK-47324.
Authored-by: Kent Yao <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
---
.../spark/sql/jdbc/PostgresIntegrationSuite.scala | 17 +++++---
.../sql/execution/datasources/jdbc/JdbcUtils.scala | 46 +++++++++-------------
2 files changed, 29 insertions(+), 34 deletions(-)
diff --git
a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala
b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala
index 2d1c0314f27b..04e31679f386 100644
---
a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala
+++
b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala
@@ -23,8 +23,7 @@ import java.text.SimpleDateFormat
import java.time.{LocalDateTime, ZoneOffset}
import java.util.Properties
-import org.apache.spark.sql.Column
-import org.apache.spark.sql.Row
+import org.apache.spark.sql.{Column, Row}
import org.apache.spark.sql.catalyst.expressions.Literal
import org.apache.spark.sql.types.{ArrayType, DecimalType, FloatType,
ShortType}
import org.apache.spark.tags.DockerTest
@@ -149,9 +148,12 @@ class PostgresIntegrationSuite extends
DockerJDBCIntegrationSuite {
|('2013-04-05 18:01:02.123456')""".stripMargin).executeUpdate()
conn.prepareStatement("CREATE TABLE infinity_timestamp" +
- "(id SERIAL PRIMARY KEY, timestamp_column TIMESTAMP);").executeUpdate()
- conn.prepareStatement("INSERT INTO infinity_timestamp (timestamp_column)" +
- " VALUES ('infinity'), ('-infinity');").executeUpdate()
+ "(id SERIAL PRIMARY KEY, timestamp_column TIMESTAMP, timestamp_array
TIMESTAMP[])")
+ .executeUpdate()
+ conn.prepareStatement("INSERT INTO infinity_timestamp (timestamp_column,
timestamp_array)" +
+ " VALUES ('infinity', ARRAY[TIMESTAMP 'infinity']), " +
+ "('-infinity', ARRAY[TIMESTAMP '-infinity'])")
+ .executeUpdate()
conn.prepareStatement("CREATE DOMAIN not_null_text AS TEXT DEFAULT
''").executeUpdate()
conn.prepareStatement("create table custom_type(type_array
not_null_text[]," +
@@ -447,10 +449,13 @@ class PostgresIntegrationSuite extends
DockerJDBCIntegrationSuite {
assert(row.length == 2)
val infinity = row(0).getAs[Timestamp]("timestamp_column")
val negativeInfinity = row(1).getAs[Timestamp]("timestamp_column")
+ val infinitySeq =
row(0).getAs[scala.collection.Seq[Timestamp]]("timestamp_array")
+ val negativeInfinitySeq =
row(1).getAs[scala.collection.Seq[Timestamp]]("timestamp_array")
val minTimeStamp = LocalDateTime.of(1, 1, 1, 0, 0,
0).toEpochSecond(ZoneOffset.UTC)
val maxTimestamp = LocalDateTime.of(9999, 12, 31, 23, 59,
59).toEpochSecond(ZoneOffset.UTC)
-
assert(infinity.getTime == maxTimestamp)
assert(negativeInfinity.getTime == minTimeStamp)
+ assert(infinitySeq.head.getTime == maxTimestamp)
+ assert(negativeInfinitySeq.head.getTime == minTimeStamp)
}
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
index b5e78ba32cd5..a7bbb832a839 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
@@ -17,7 +17,8 @@
package org.apache.spark.sql.execution.datasources.jdbc
-import java.sql.{Connection, JDBCType, PreparedStatement, ResultSet,
ResultSetMetaData, SQLException}
+import java.math.{BigDecimal => JBigDecimal}
+import java.sql.{Connection, Date, JDBCType, PreparedStatement, ResultSet,
ResultSetMetaData, SQLException, Timestamp}
import java.time.{Instant, LocalDate}
import java.util
import java.util.concurrent.TimeUnit
@@ -414,7 +415,7 @@ object JdbcUtils extends Logging with SQLConfHelper {
case DecimalType.Fixed(p, s) =>
(rs: ResultSet, row: InternalRow, pos: Int) =>
val decimal =
- nullSafeConvert[java.math.BigDecimal](rs.getBigDecimal(pos + 1), d
=> Decimal(d, p, s))
+ nullSafeConvert[JBigDecimal](rs.getBigDecimal(pos + 1), d =>
Decimal(d, p, s))
row.update(pos, decimal)
case DoubleType =>
@@ -508,37 +509,22 @@ object JdbcUtils extends Logging with SQLConfHelper {
case ArrayType(et, _) =>
val elementConversion = et match {
- case TimestampType =>
- (array: Object) =>
- array.asInstanceOf[Array[java.sql.Timestamp]].map { timestamp =>
- nullSafeConvert(timestamp, fromJavaTimestamp)
- }
+ case TimestampType => arrayConverter[Timestamp] {
+ (t: Timestamp) =>
fromJavaTimestamp(dialect.convertJavaTimestampToTimestamp(t))
+ }
case TimestampNTZType =>
- (array: Object) =>
- array.asInstanceOf[Array[java.sql.Timestamp]].map { timestamp =>
- nullSafeConvert(timestamp, (t: java.sql.Timestamp) =>
-
localDateTimeToMicros(dialect.convertJavaTimestampToTimestampNTZ(t)))
- }
+ arrayConverter[Timestamp] {
+ (t: Timestamp) =>
localDateTimeToMicros(dialect.convertJavaTimestampToTimestampNTZ(t))
+ }
case StringType =>
- (array: Object) =>
- // some underling types are not String such as uuid, inet, cidr,
etc.
- array.asInstanceOf[Array[java.lang.Object]]
- .map(obj => if (obj == null) null else
UTF8String.fromString(obj.toString))
-
- case DateType =>
- (array: Object) =>
- array.asInstanceOf[Array[java.sql.Date]].map { date =>
- nullSafeConvert(date, fromJavaDate)
- }
+ arrayConverter[Object]((obj: Object) =>
UTF8String.fromString(obj.toString))
+
+ case DateType => arrayConverter[Date](fromJavaDate)
case dt: DecimalType =>
- (array: Object) =>
- array.asInstanceOf[Array[java.math.BigDecimal]].map { decimal =>
- nullSafeConvert[java.math.BigDecimal](
- decimal, d => Decimal(d, dt.precision, dt.scale))
- }
+ arrayConverter[java.math.BigDecimal](d => Decimal(d, dt.precision,
dt.scale))
case LongType if metadata.contains("binarylong") =>
throw
QueryExecutionErrors.unsupportedArrayElementTypeBasedOnBinaryError(dt)
@@ -552,7 +538,7 @@ object JdbcUtils extends Logging with SQLConfHelper {
(rs: ResultSet, row: InternalRow, pos: Int) =>
val array = nullSafeConvert[java.sql.Array](
input = rs.getArray(pos + 1),
- array => new
GenericArrayData(elementConversion.apply(array.getArray)))
+ array => new GenericArrayData(elementConversion(array.getArray)))
row.update(pos, array)
case _ => throw
QueryExecutionErrors.unsupportedJdbcTypeError(dt.catalogString)
@@ -566,6 +552,10 @@ object JdbcUtils extends Logging with SQLConfHelper {
}
}
+ private def arrayConverter[T](elementConvert: T => Any): Any => Any =
(array: Any) => {
+ array.asInstanceOf[Array[T]].map(e => nullSafeConvert(e, elementConvert))
+ }
+
// A `JDBCValueSetter` is responsible for setting a value from `Row` into a
field for
// `PreparedStatement`. The last argument `Int` means the index for the
value to be set
// in the SQL statement and also used for the value in `Row`.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]