This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new e8bc176e6fd1 [SPARK-47342][SQL] Support TimestampNTZ for DB2 TIMESTAMP
WITH TIME ZONE
e8bc176e6fd1 is described below
commit e8bc176e6fd145bab4cde6bf38931a7ad4c7eecd
Author: Kent Yao <[email protected]>
AuthorDate: Tue Mar 12 07:33:24 2024 -0700
[SPARK-47342][SQL] Support TimestampNTZ for DB2 TIMESTAMP WITH TIME ZONE
### What changes were proposed in this pull request?
This PR Supports TimestampNTZ for DB2 TIMESTAMP WITH TIME ZONE when
`preferTimestampNTZ` option is set to true by users
### Why are the changes needed?
improve DB2 connector
### Does this PR introduce _any_ user-facing change?
yes, preferTimestampNTZ works for DB2 TIMESTAMP WITH TIME ZONE
### How was this patch tested?
new tests
### Was this patch authored or co-authored using generative AI tooling?
no
Closes #45471 from yaooqinn/SPARK-47342.
Authored-by: Kent Yao <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../org/apache/spark/sql/jdbc/DB2IntegrationSuite.scala | 14 ++++++++++++++
.../spark/sql/execution/datasources/jdbc/JdbcUtils.scala | 10 ++++++++--
.../main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala | 2 +-
.../scala/org/apache/spark/sql/jdbc/JdbcDialects.scala | 7 +++++++
.../scala/org/apache/spark/sql/jdbc/PostgresDialect.scala | 13 +++++--------
.../test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala | 9 +++++++--
6 files changed, 42 insertions(+), 13 deletions(-)
diff --git
a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/DB2IntegrationSuite.scala
b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/DB2IntegrationSuite.scala
index cedb33d491fb..14776047cec4 100644
---
a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/DB2IntegrationSuite.scala
+++
b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/DB2IntegrationSuite.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.jdbc
import java.math.BigDecimal
import java.sql.{Connection, Date, Timestamp}
+import java.time.LocalDateTime
import java.util.Properties
import org.scalatest.time.SpanSugar._
@@ -224,4 +225,17 @@ class DB2IntegrationSuite extends
DockerJDBCIntegrationSuite {
assert(actual === expected)
}
+
+ test("SPARK-47342:gi Support TimestampNTZ for DB2 TIMESTAMP WITH TIME ZONE")
{
+ // The test only covers TIMESTAMP WITHOUT TIME ZONE so far, we shall
support
+ // TIMESTAMP WITH TIME ZONE but I don't figure it out to mock a TSTZ value.
+ withDefaultTimeZone(UTC) {
+ val df = spark.read.format("jdbc")
+ .option("url", jdbcUrl)
+ .option("preferTimestampNTZ", "true")
+ .option("query", "select ts from dates")
+ .load()
+ checkAnswer(df, Row(LocalDateTime.of(2009, 2, 13, 23, 31, 30)))
+ }
+ }
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
index a7bbb832a839..27c032471b57 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
@@ -212,8 +212,7 @@ object JdbcUtils extends Logging with SQLConfHelper {
case java.sql.Types.SQLXML => StringType
case java.sql.Types.STRUCT => StringType
case java.sql.Types.TIME => TimestampType
- case java.sql.Types.TIMESTAMP if isTimestampNTZ => TimestampNTZType
- case java.sql.Types.TIMESTAMP => TimestampType
+ case java.sql.Types.TIMESTAMP => getTimestampType(isTimestampNTZ)
case java.sql.Types.TINYINT => IntegerType
case java.sql.Types.VARBINARY => BinaryType
case java.sql.Types.VARCHAR if conf.charVarcharAsString => StringType
@@ -229,6 +228,13 @@ object JdbcUtils extends Logging with SQLConfHelper {
throw QueryExecutionErrors.unrecognizedSqlTypeError(jdbcType, typeName)
}
+ /**
+ * Return TimestampNTZType if isTimestampNT; otherwise TimestampType.
+ */
+ def getTimestampType(isTimestampNTZ: Boolean): DataType = {
+ if (isTimestampNTZ) TimestampNTZType else TimestampType
+ }
+
/**
* Returns the schema if the table already exists in the JDBC database.
*/
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala
b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala
index 62c31b1c4c5d..ff3e74eae205 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala
@@ -91,7 +91,7 @@ private object DB2Dialect extends JdbcDialect {
typeName match {
case "DECFLOAT" => Option(DecimalType(38, 18))
case "XML" => Option(StringType)
- case t if (t.startsWith("TIMESTAMP")) => Option(TimestampType) //
TIMESTAMP WITH TIMEZONE
+ case t if t.startsWith("TIMESTAMP") =>
Option(getTimestampType(md.build()))
case _ => None
}
case _ => None
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
index 6621282647d4..6d67a0d91eae 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
@@ -744,6 +744,13 @@ abstract class JdbcDialect extends Serializable with
Logging {
def getFullyQualifiedQuotedTableName(ident: Identifier): String = {
(ident.namespace() :+ ident.name()).map(quoteIdentifier).mkString(".")
}
+
+ /**
+ * Return TimestampType/TimestampNTZType based on the metadata.
+ */
+ protected final def getTimestampType(md: Metadata): DataType = {
+ JdbcUtils.getTimestampType(md.getBoolean("isTimestampNTZ"))
+ }
}
/**
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala
b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala
index d19b5ba3e0eb..6852f1f69984 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala
@@ -64,18 +64,15 @@ private object PostgresDialect extends JdbcDialect with
SQLConfHelper {
} else if ("text".equalsIgnoreCase(typeName)) {
Some(StringType) // sqlType is Types.VARCHAR
} else if (sqlType == Types.ARRAY) {
- val scale = md.build().getLong("scale").toInt
- val isTimestampNTZ = md.build().getBoolean("isTimestampNTZ")
// postgres array type names start with underscore
- toCatalystType(typeName.drop(1), size, scale,
isTimestampNTZ).map(ArrayType(_))
+ toCatalystType(typeName.drop(1), size, md.build()).map(ArrayType(_))
} else None
}
private def toCatalystType(
typeName: String,
precision: Int,
- scale: Int,
- isTimestampNTZ: Boolean): Option[DataType] = typeName match {
+ metadata: Metadata): Option[DataType] = typeName match {
case "bool" => Some(BooleanType)
case "bit" => Some(BinaryType)
case "int2" => Some(ShortType)
@@ -91,10 +88,10 @@ private object PostgresDialect extends JdbcDialect with
SQLConfHelper {
"interval" | "pg_snapshot" =>
Some(StringType)
case "bytea" => Some(BinaryType)
- case "timestamp" | "timestamptz" | "time" | "timetz" =>
- Some(if (isTimestampNTZ) TimestampNTZType else TimestampType)
+ case "timestamp" | "timestamptz" | "time" | "timetz" =>
Some(getTimestampType(metadata))
case "date" => Some(DateType)
- case "numeric" | "decimal" if precision > 0 =>
Some(DecimalType.bounded(precision, scale))
+ case "numeric" | "decimal" if precision > 0 =>
+ Some(DecimalType.bounded(precision, metadata.getLong("scale").toInt))
case "numeric" | "decimal" =>
// SPARK-26538: handle numeric without explicit precision and scale.
Some(DecimalType.SYSTEM_DEFAULT)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
index b8ca70e0b175..47d60abd1dd4 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
@@ -903,6 +903,8 @@ class JDBCSuite extends QueryTest with SharedSparkSession {
test("DB2Dialect type mapping") {
val db2Dialect = JdbcDialects.get("jdbc:db2://127.0.0.1/db")
+ val metadata = new MetadataBuilder().putBoolean("isTimestampNTZ", false)
+
assert(db2Dialect.getJDBCType(StringType).map(_.databaseTypeDefinition).get ==
"CLOB")
assert(db2Dialect.getJDBCType(BooleanType).map(_.databaseTypeDefinition).get ==
"CHAR(1)")
assert(db2Dialect.getJDBCType(ShortType).map(_.databaseTypeDefinition).get
== "SMALLINT")
@@ -912,8 +914,11 @@ class JDBCSuite extends QueryTest with SharedSparkSession {
assert(db2Dialect.getCatalystType(java.sql.Types.OTHER, "DECFLOAT", 1,
null) ==
Option(DecimalType(38, 18)))
assert(db2Dialect.getCatalystType(java.sql.Types.OTHER, "XML", 1, null) ==
Option(StringType))
- assert(db2Dialect.getCatalystType(java.sql.Types.OTHER, "TIMESTAMP WITH
TIME ZONE", 1, null) ==
- Option(TimestampType))
+ assert(db2Dialect.getCatalystType(
+ java.sql.Types.OTHER, "TIMESTAMP WITH TIME ZONE", 1, metadata) ===
Option(TimestampType))
+ metadata.putBoolean("isTimestampNTZ", true)
+ assert(db2Dialect.getCatalystType(
+ java.sql.Types.OTHER, "TIMESTAMP WITH TIME ZONE", 1, metadata) ===
Option(TimestampNTZType))
}
test("MySQLDialect catalyst type mapping") {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]